Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id F2A47200B68 for ; Fri, 19 Aug 2016 17:00:45 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id F10EA160AAB; Fri, 19 Aug 2016 15:00:45 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 745F5160A79 for ; Fri, 19 Aug 2016 17:00:44 +0200 (CEST) Received: (qmail 91726 invoked by uid 500); 19 Aug 2016 15:00:43 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 91717 invoked by uid 99); 19 Aug 2016 15:00:43 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Aug 2016 15:00:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 77894DFDCE; Fri, 19 Aug 2016 15:00:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Message-Id: <19b6453ab55b463e896e2d015387a9f0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: ignite-3220-1 Date: Fri, 19 Aug 2016 15:00:43 +0000 (UTC) archived-at: Fri, 19 Aug 2016 15:00:46 -0000 Repository: ignite Updated Branches: refs/heads/ignite-3220-1 [created] ea2b19fd6 ignite-3220-1 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ea2b19fd Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ea2b19fd Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ea2b19fd Branch: refs/heads/ignite-3220-1 Commit: ea2b19fd65cd2e6a2d4a00606e2819c27a9bbbe2 Parents: 0c860ec Author: sboikov Authored: Fri Aug 19 16:34:59 2016 +0300 Committer: sboikov Committed: Fri Aug 19 17:58:04 2016 +0300 ---------------------------------------------------------------------- .../managers/communication/GridIoMessage.java | 4 + .../communication/tcp/TcpCommunicationSpi.java | 253 ++++++++++++------- .../spi/GridTcpSpiForwardingSelfTest.java | 4 +- 3 files changed, 163 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ea2b19fd/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java index b28ced2..929d26e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java @@ -161,6 +161,10 @@ public class GridIoMessage implements Message { return skipOnTimeout; } + public int connectionIndex() { + return ordered ? topic.hashCode() : super.hashCode(); + } + /** * @return {@code True} if message is ordered, {@code false} otherwise. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/ea2b19fd/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index cef85bb..37ab859 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -62,6 +62,7 @@ import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.util.GridConcurrentFactory; import org.apache.ignite.internal.util.GridSpinReadWriteLock; @@ -350,16 +351,25 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) { - UUID id = ses.meta(NODE_ID_META); + ConnectionId id = ses.meta(NODE_ID_META); if (id != null) { + GridCommunicationClient client = clients.get(id); + + if (client instanceof GridTcpNioCommunicationClient && + ((GridTcpNioCommunicationClient) client).session() == ses) { + client.close(); + + clients.remove(id, client); + } + if (!stopping) { boolean reconnect = false; GridNioRecoveryDescriptor recoveryData = ses.recoveryDescriptor(); if (recoveryData != null) { - if (recoveryData.nodeAlive(getSpiContext().node(id))) { + if (recoveryData.nodeAlive(getSpiContext().node(id.id))) { if (!recoveryData.messagesFutures().isEmpty()) { reconnect = true; @@ -373,7 +383,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } DisconnectedSessionInfo disconnectData = new DisconnectedSessionInfo(id, - ses, recoveryData, reconnect); @@ -383,7 +392,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter CommunicationListener lsnr0 = lsnr; if (lsnr0 != null) - lsnr0.onDisconnected(id); + lsnr0.onDisconnected(id.id); } } @@ -392,24 +401,24 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * @param msg Message. */ private void onFirstMessage(GridNioSession ses, Message msg) { - UUID sndId; + ConnectionId sndId; if (msg instanceof NodeIdMessage) - sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes, 0); + sndId = new ConnectionId(-1, U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes, 0)); else { assert msg instanceof HandshakeMessage : msg; - sndId = ((HandshakeMessage)msg).nodeId(); + sndId = new ConnectionId(((HandshakeMessage)msg).idx, ((HandshakeMessage)msg).nodeId()); } if (log.isDebugEnabled()) log.debug("Remote node ID received: " + sndId); - final UUID old = ses.addMeta(NODE_ID_META, sndId); + final ConnectionId old = ses.addMeta(NODE_ID_META, sndId); assert old == null; - final ClusterNode rmtNode = getSpiContext().node(sndId); + final ClusterNode rmtNode = getSpiContext().node(sndId.id); if (rmtNode == null) { if (log.isDebugEnabled()) @@ -455,7 +464,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter HandshakeMessage msg0 = (HandshakeMessage)msg; - final GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(rmtNode); + final GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(rmtNode, sndId); if (oldFut == null) { oldClient = clients.get(sndId); @@ -481,7 +490,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(), - new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut)); + new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut, sndId)); if (log.isDebugEnabled()) log.debug("Received incoming connection from remote node " + @@ -490,12 +499,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (reserved) { try { GridTcpNioCommunicationClient client = - connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient); + connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient, sndId); fut.onDone(client); } finally { - clientFuts.remove(rmtNode.id(), fut); + clientFuts.remove(sndId, fut); } } } @@ -513,16 +522,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter else { // The code below causes a race condition between shmem and TCP (see IGNITE-1294) boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(), - new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut)); + new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut, sndId)); if (reserved) - connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient); + connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient, sndId); } } } @Override public void onMessage(GridNioSession ses, Message msg) { - UUID sndId = ses.meta(NODE_ID_META); + ConnectionId sndId = ses.meta(NODE_ID_META); if (sndId == null) { assert ses.accepted() : ses; @@ -594,7 +603,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter else c = NOOP; - notifyListener(sndId, msg, c); + notifyListener(sndId.id, msg, c); } } @@ -613,7 +622,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter ClusterNode node, long rcvCnt, boolean sndRes, - boolean createClient) { + boolean createClient, + ConnectionId connId) { + assert node.id().equals(connId.id); + recovery.onHandshake(rcvCnt); ses.recoveryDescriptor(recovery); @@ -630,7 +642,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (createClient) { client = new GridTcpNioCommunicationClient(ses, log); - GridCommunicationClient oldClient = clients.putIfAbsent(node.id(), client); + GridCommunicationClient oldClient = clients.putIfAbsent(connId, client); assert oldClient == null : "Client already created [node=" + node + ", client=" + client + ", oldClient=" + oldClient + ", recoveryDesc=" + recovery + ']'; @@ -665,6 +677,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** */ private final boolean createClient; + private final ConnectionId connId; + /** * @param ses Incoming session. * @param recoveryDesc Recovery descriptor. @@ -678,13 +692,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter ClusterNode rmtNode, HandshakeMessage msg, boolean createClient, - GridFutureAdapter fut) { + GridFutureAdapter fut, + ConnectionId connId) { this.ses = ses; this.recoveryDesc = recoveryDesc; this.rmtNode = rmtNode; this.msg = msg; this.createClient = createClient; this.fut = fut; + this.connId = connId; } /** {@inheritDoc} */ @@ -696,7 +712,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter msgFut.get(); GridTcpNioCommunicationClient client = - connected(recoveryDesc, ses, rmtNode, msg.received(), false, createClient); + connected(recoveryDesc, ses, rmtNode, msg.received(), false, createClient, connId); fut.onDone(client); } @@ -710,7 +726,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter fut.onDone(); } finally { - clientFuts.remove(rmtNode.id(), fut); + clientFuts.remove(connId, fut); } } }; @@ -722,7 +738,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter fut.onDone(); } finally { - clientFuts.remove(rmtNode.id(), fut); + clientFuts.remove(connId, fut); } } } @@ -807,7 +823,44 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter private final Collection shmemWorkers = new ConcurrentLinkedDeque8<>(); /** Clients. */ - private final ConcurrentMap clients = GridConcurrentFactory.newMap(); + private final ConcurrentMap clients = GridConcurrentFactory.newMap(); + + /** + * + */ + public static final class ConnectionId { + /** */ + private final int idx; + + /** */ + private final UUID id; + + /** + * @param idx Connection index. + * @param id Node ID. + */ + ConnectionId(int idx, UUID id) { + this.idx = idx; + this.id = id; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + ConnectionId clientId = (ConnectionId) o; + + return idx == clientId.idx && id.equals(clientId.id); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = idx; + res = 31 * res + id.hashCode(); + return res; + } + } /** SPI listener. */ private volatile CommunicationListener lsnr; @@ -854,7 +907,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter }; /** Client connect futures. */ - private final ConcurrentMap> clientFuts = + private final ConcurrentMap> clientFuts = GridConcurrentFactory.newMap(); /** */ @@ -1625,9 +1678,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assert formatter != null; - UUID rmtNodeId = ses.meta(NODE_ID_META); + ConnectionId rmtNodeId = ses.meta(NODE_ID_META); - return rmtNodeId != null ? formatter.reader(rmtNodeId, msgFactory) : null; + return rmtNodeId != null ? formatter.reader(rmtNodeId.id, msgFactory) : null; } }; @@ -1640,9 +1693,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assert formatter != null; - UUID rmtNodeId = ses.meta(NODE_ID_META); + ConnectionId rmtNodeId = ses.meta(NODE_ID_META); - return rmtNodeId != null ? formatter.writer(rmtNodeId) : null; + return rmtNodeId != null ? formatter.writer(rmtNodeId.id) : null; } }; @@ -1879,16 +1932,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter void onNodeLeft(UUID nodeId) { assert nodeId != null; - GridCommunicationClient client = clients.get(nodeId); + for (Map.Entry client : clients.entrySet()) { + ConnectionId id = client.getKey(); - if (client != null) { - if (log.isDebugEnabled()) - log.debug("Forcing NIO client close since node has left [nodeId=" + nodeId + - ", client=" + client + ']'); + if (id.id.equals(nodeId)) { + if (log.isDebugEnabled()) + log.debug("Forcing NIO client close since node has left [nodeId=" + nodeId + + ", client=" + client + ']'); - client.forceClose(); + client.getValue().forceClose(); - clients.remove(nodeId, client); + clients.remove(id, client.getValue()); + } } } @@ -1936,6 +1991,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter sendMessage0(node, msg, ackC); } + private final int CONNECTIONS_PER_NODE = 2; + /** * @param node Destination node. * @param msg Message to send. @@ -1963,11 +2020,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter else { GridCommunicationClient client = null; + ConnectionId id = new ConnectionId(((GridIoMessage)msg).connectionIndex() % CONNECTIONS_PER_NODE, node.id()); + try { boolean retry; do { - client = reserveClient(node); + client = reserveClient(node, id); UUID nodeId = null; @@ -1996,7 +2055,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter throw new IgniteSpiException("Failed to send message to remote node: " + node, e); } finally { - if (client != null && clients.remove(node.id(), client)) + if (client != null && clients.remove(id, client)) client.forceClose(); } } @@ -2009,13 +2068,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * @return The existing or just created client. * @throws IgniteCheckedException Thrown if any exception occurs. */ - private GridCommunicationClient reserveClient(ClusterNode node) throws IgniteCheckedException { + private GridCommunicationClient reserveClient(ClusterNode node, ConnectionId id) throws IgniteCheckedException { assert node != null; UUID nodeId = node.id(); while (true) { - GridCommunicationClient client = clients.get(nodeId); + GridCommunicationClient client = clients.get(id); if (client == null) { if (stopping) @@ -2024,17 +2083,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter // Do not allow concurrent connects. GridFutureAdapter fut = new ConnectFuture(); - GridFutureAdapter oldFut = clientFuts.putIfAbsent(nodeId, fut); + GridFutureAdapter oldFut = clientFuts.putIfAbsent(id, fut); if (oldFut == null) { try { - GridCommunicationClient client0 = clients.get(nodeId); + GridCommunicationClient client0 = clients.get(id); if (client0 == null) { - client0 = createNioClient(node); + client0 = createNioClient(node, id); if (client0 != null) { - GridCommunicationClient old = clients.put(nodeId, client0); + GridCommunicationClient old = clients.put(id, client0); assert old == null : "Client already created " + "[node=" + node + ", client=" + client0 + ", oldClient=" + old + ']'; @@ -2042,7 +2101,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (client0 instanceof GridTcpNioCommunicationClient) { GridTcpNioCommunicationClient tcpClient = ((GridTcpNioCommunicationClient)client0); - if (tcpClient.session().closeTime() > 0 && clients.remove(nodeId, client0)) { + if (tcpClient.session().closeTime() > 0 && clients.remove(id, client0)) { if (log.isDebugEnabled()) log.debug("Session was closed after client creation, will retry " + "[node=" + node + ", client=" + client0 + ']'); @@ -2064,7 +2123,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter throw (Error)e; } finally { - clientFuts.remove(nodeId, fut); + clientFuts.remove(id, fut); } } else @@ -2076,7 +2135,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter continue; if (getSpiContext().node(nodeId) == null) { - if (clients.remove(nodeId, client)) + if (clients.remove(id, client)) client.forceClose(); throw new IgniteSpiException("Destination node is not in topology: " + node.id()); @@ -2087,7 +2146,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter return client; else // Client has just been closed by idle worker. Help it and try again. - clients.remove(nodeId, client); + clients.remove(id, client); } } @@ -2096,7 +2155,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * @return Client. * @throws IgniteCheckedException If failed. */ - @Nullable protected GridCommunicationClient createNioClient(ClusterNode node) throws IgniteCheckedException { + @Nullable protected GridCommunicationClient createNioClient(ClusterNode node, ConnectionId id) throws IgniteCheckedException { assert node != null; Integer shmemPort = node.attribute(createSpiAttributeName(ATTR_SHMEM_PORT)); @@ -2137,7 +2196,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter connectGate.enter(); try { - GridCommunicationClient client = createTcpClient(node); + GridCommunicationClient client = createTcpClient(node, id); if (log.isDebugEnabled()) log.debug("TCP client created: " + client); @@ -2190,7 +2249,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } try { - safeHandshake(client, null, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0), null); + safeHandshake(client, null, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0), null, null); } catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) { client.forceClose(); @@ -2249,10 +2308,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter */ private void checkClientQueueSize(GridNioSession ses, int msgQueueSize) { if (slowClientQueueLimit > 0 && msgQueueSize > slowClientQueueLimit) { - UUID id = ses.meta(NODE_ID_META); + ConnectionId id = ses.meta(NODE_ID_META); if (id != null) { - ClusterNode node = getSpiContext().node(id); + ClusterNode node = getSpiContext().node(id.id); if (node != null && node.isClient()) { String msg = "Client node outbound message queue size exceeded slowClientQueueLimit, " + @@ -2266,7 +2325,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter log, msg); - getSpiContext().failNode(id, msg); + getSpiContext().failNode(id.id, msg); } } } @@ -2279,7 +2338,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * @return Client. * @throws IgniteCheckedException If failed. */ - protected GridCommunicationClient createTcpClient(ClusterNode node) throws IgniteCheckedException { + protected GridCommunicationClient createTcpClient(ClusterNode node, ConnectionId id) throws IgniteCheckedException { Collection rmtAddrs0 = node.attribute(createSpiAttributeName(ATTR_ADDRS)); Collection rmtHostNames0 = node.attribute(createSpiAttributeName(ATTR_HOST_NAMES)); Integer boundPort = node.attribute(createSpiAttributeName(ATTR_PORT)); @@ -2347,7 +2406,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter "(node left topology): " + node); } - GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(node); + GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(node, id); if (!recoveryDesc.reserve()) { U.closeQuiet(ch); @@ -2369,7 +2428,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), - timeoutHelper.nextTimeoutChunk(connTimeout0), sslEngine); + timeoutHelper.nextTimeoutChunk(connTimeout0), sslEngine, id); if (rcvCnt == -1) return null; @@ -2382,7 +2441,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter try { Map meta = new HashMap<>(); - meta.put(NODE_ID_META, node.id()); + meta.put(NODE_ID_META, id); if (isSslEnabled()) { assert sslEngine != null; @@ -2551,7 +2610,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter @Nullable GridNioRecoveryDescriptor recovery, UUID rmtNodeId, long timeout, - @Nullable SSLEngine ssl + @Nullable SSLEngine ssl, + ConnectionId id ) throws IgniteCheckedException { HandshakeTimeoutObject obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout); @@ -2633,12 +2693,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (recovery != null) { HandshakeMessage msg = new HandshakeMessage(locNode.id(), recovery.incrementConnectCount(), - recovery.received()); + recovery.received(), + id.idx); if (log.isDebugEnabled()) log.debug("Write handshake message [rmtNode=" + rmtNodeId + ", msg=" + msg + ']'); - buf = ByteBuffer.allocate(33); + buf = ByteBuffer.allocate(37); buf.order(ByteOrder.nativeOrder()); @@ -2792,8 +2853,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * @param node Node. * @return Recovery receive data for given node. */ - private GridNioRecoveryDescriptor recoveryDescriptor(ClusterNode node) { - ClientKey id = new ClientKey(node.id(), node.order()); + private GridNioRecoveryDescriptor recoveryDescriptor(ClusterNode node, ConnectionId clientId) { + ClientKey id = new ClientKey(node.id(), node.order(), clientId.idx); GridNioRecoveryDescriptor recovery = recoveryDescs.get(id); @@ -2850,18 +2911,22 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter */ private static class ClientKey { /** */ - private UUID nodeId; + private final UUID nodeId; /** */ - private long order; + private final long order; + + /** */ + private final int idx; /** * @param nodeId Node ID. * @param order Node order. */ - private ClientKey(UUID nodeId, long order) { + private ClientKey(UUID nodeId, long order, int idx) { this.nodeId = nodeId; this.order = order; + this.idx = idx; } /** {@inheritDoc} */ @@ -2874,7 +2939,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter ClientKey other = (ClientKey)obj; - return order == other.order && nodeId.equals(other.nodeId); + return idx == other.idx && order == other.order && nodeId.equals(other.nodeId); } @@ -2882,7 +2947,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter @Override public int hashCode() { int res = nodeId.hashCode(); - res = 31 * res + (int)(order ^ (order >>> 32)); + res = 31 * res + (int)(order ^ (order >>> 32)) + idx; return res; } @@ -2992,9 +3057,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assert formatter != null; - UUID rmtNodeId = ses.meta(NODE_ID_META); + ConnectionId rmtNodeId = ses.meta(NODE_ID_META); - return rmtNodeId != null ? formatter.writer(rmtNodeId) : null; + return rmtNodeId != null ? formatter.writer(rmtNodeId.id) : null; } }; @@ -3008,9 +3073,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assert formatter != null; - UUID rmtNodeId = ses.meta(NODE_ID_META); + ConnectionId rmtNodeId = ses.meta(NODE_ID_META); - return rmtNodeId != null ? formatter.reader(rmtNodeId, msgFactory) : null; + return rmtNodeId != null ? formatter.reader(rmtNodeId.id, msgFactory) : null; } }; @@ -3091,8 +3156,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter private void processIdle() { cleanupRecovery(); - for (Map.Entry e : clients.entrySet()) { - UUID nodeId = e.getKey(); + for (Map.Entry e : clients.entrySet()) { + UUID nodeId = e.getKey().id; GridCommunicationClient client = e.getValue(); @@ -3104,7 +3169,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter client.forceClose(); - clients.remove(nodeId, client); + clients.remove(e.getKey(), client); continue; } @@ -3112,7 +3177,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter GridNioRecoveryDescriptor recovery = null; if (client instanceof GridTcpNioCommunicationClient) { - recovery = recoveryDescs.get(new ClientKey(node.id(), node.order())); + recovery = recoveryDescs.get(new ClientKey(node.id(), node.order(), e.getKey().idx)); if (recovery != null && recovery.lastAcknowledged() != recovery.received()) { RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received()); @@ -3146,7 +3211,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter log.debug("Closing idle node connection: " + nodeId); if (client.close() || client.closed()) - clients.remove(nodeId, client); + clients.remove(e.getKey(), client); } } } @@ -3187,12 +3252,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * @param sesInfo Disconnected session information. */ private void processDisconnect(DisconnectedSessionInfo sesInfo) { - GridCommunicationClient client = clients.get(sesInfo.nodeId); - - if (client instanceof GridTcpNioCommunicationClient && - ((GridTcpNioCommunicationClient) client).session() == sesInfo.ses) - clients.remove(sesInfo.nodeId, client); - if (sesInfo.reconnect) { GridNioRecoveryDescriptor recoveryDesc = sesInfo.recoveryDesc; @@ -3205,7 +3264,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (log.isDebugEnabled()) log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']'); - client = reserveClient(node); + GridCommunicationClient client = reserveClient(node, sesInfo.connId); client.release(); } @@ -3413,6 +3472,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** */ private long connectCnt; + /** */ + private int idx; + /** * Default constructor required by {@link Message}. */ @@ -3425,13 +3487,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * @param connectCnt Connect count. * @param rcvCnt Number of received messages. */ - public HandshakeMessage(UUID nodeId, long connectCnt, long rcvCnt) { + public HandshakeMessage(UUID nodeId, long connectCnt, long rcvCnt, int idx) { assert nodeId != null; assert rcvCnt >= 0 : rcvCnt; this.nodeId = nodeId; this.connectCnt = connectCnt; this.rcvCnt = rcvCnt; + this.idx = idx; } /** @@ -3462,7 +3525,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - if (buf.remaining() < 33) + if (buf.remaining() < 37) return false; buf.put(HANDSHAKE_MSG_TYPE); @@ -3477,6 +3540,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter buf.putLong(connectCnt); + buf.putInt(idx); + return true; } @@ -3495,6 +3560,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter connectCnt = buf.getLong(); + idx = buf.getInt(); + return true; } @@ -3756,10 +3823,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter */ private static class DisconnectedSessionInfo { /** */ - private final UUID nodeId; - - /** */ - private final GridNioSession ses; + private final ConnectionId connId; /** */ private final GridNioRecoveryDescriptor recoveryDesc; @@ -3768,17 +3832,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter private final boolean reconnect; /** - * @param nodeId Node ID. - * @param ses Session. + * @param connId Node ID. * @param recoveryDesc Recovery descriptor. * @param reconnect Reconnect flag. */ - public DisconnectedSessionInfo(UUID nodeId, - GridNioSession ses, + public DisconnectedSessionInfo(ConnectionId connId, @Nullable GridNioRecoveryDescriptor recoveryDesc, boolean reconnect) { - this.nodeId = nodeId; - this.ses = ses; + this.connId = connId; this.recoveryDesc = recoveryDesc; this.reconnect = reconnect; } http://git-wip-us.apache.org/repos/asf/ignite/blob/ea2b19fd/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java index 652e47f..10cd247 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java @@ -111,14 +111,14 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest { cfg.setConnectorConfiguration(null); TcpCommunicationSpi commSpi = new TcpCommunicationSpi() { - @Override protected GridCommunicationClient createTcpClient(ClusterNode node) throws IgniteCheckedException { + @Override protected GridCommunicationClient createTcpClient(ClusterNode node, ConnectionId idx) throws IgniteCheckedException { Map attrs = new HashMap<>(node.attributes()); attrs.remove(createSpiAttributeName(ATTR_PORT)); ((TcpDiscoveryNode)node).setAttributes(attrs); - return super.createTcpClient(node); + return super.createTcpClient(node, idx); } };