Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6CCA6200B84 for ; Tue, 20 Sep 2016 13:39:48 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 6B68C160AD6; Tue, 20 Sep 2016 11:39:48 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9EFFA160AA9 for ; Tue, 20 Sep 2016 13:39:46 +0200 (CEST) Received: (qmail 11304 invoked by uid 500); 20 Sep 2016 11:39:45 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 11295 invoked by uid 99); 20 Sep 2016 11:39:45 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Sep 2016 11:39:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7C49AE0243; Tue, 20 Sep 2016 11:39:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Message-Id: <270a0152b32a4801b29144c8df948a2e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: conn Date: Tue, 20 Sep 2016 11:39:45 +0000 (UTC) archived-at: Tue, 20 Sep 2016 11:39:48 -0000 Repository: ignite Updated Branches: refs/heads/ignite-comm-opts2 6e72f5198 -> 4030ef886 conn Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4030ef88 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4030ef88 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4030ef88 Branch: refs/heads/ignite-comm-opts2 Commit: 4030ef886043360e846926f2a72d5a8a81393e21 Parents: 6e72f51 Author: sboikov Authored: Tue Sep 20 11:49:10 2016 +0300 Committer: sboikov Committed: Tue Sep 20 14:37:06 2016 +0300 ---------------------------------------------------------------------- .../nio/GridAbstractCommunicationClient.java | 1 + .../util/nio/GridShmemCommunicationClient.java | 1 + .../util/nio/GridTcpNioCommunicationClient.java | 2 +- .../communication/tcp/TcpCommunicationSpi.java | 244 ++++++++++++++----- ...eAtomicMessageRecovery10ConnectionsTest.java | 28 +++ .../IgniteCacheMessageRecoveryAbstractTest.java | 24 +- .../spi/GridTcpSpiForwardingSelfTest.java | 5 +- ...mmunicationSpiConcurrentConnectSelfTest.java | 17 +- .../GridTcpCommunicationSpiConfigSelfTest.java | 3 + ...cpCommunicationSpiMultithreadedSelfTest.java | 12 +- ...dTcpCommunicationSpiRecoveryAckSelfTest.java | 1 + ...GridTcpCommunicationSpiRecoverySelfTest.java | 1 + ...CommunicationRecoveryAckClosureSelfTest.java | 1 + .../ignite/testsuites/IgniteCacheTestSuite.java | 2 + 14 files changed, 268 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java index 37bc170..f2ab932 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java @@ -39,6 +39,7 @@ public abstract class GridAbstractCommunicationClient implements GridCommunicati private final int connIdx; /** + * @param connIdx Connection index. * @param metricsLsnr Metrics listener. */ protected GridAbstractCommunicationClient(int connIdx, @Nullable GridNioMetricsListener metricsLsnr) { http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java index 74d58b2..d941bae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java @@ -48,6 +48,7 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien private final MessageFormatter formatter; /** + * @param connIdx Connection index. * @param metricsLsnr Metrics listener. * @param port Shared memory IPC server port. * @param connTimeout Connection timeout. http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java index 90f17b9..fcb40c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java @@ -45,8 +45,8 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie private final IgniteLogger log; /** - * @param ses Session. * @param connIdx Connection index. + * @param ses Session. * @param log Logger. */ public GridTcpNioCommunicationClient( http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 7d91120..2d1a2b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -47,6 +47,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLException; import org.apache.ignite.Ignite; @@ -54,6 +55,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.AddressResolver; import org.apache.ignite.configuration.IgniteConfiguration; @@ -291,8 +293,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter */ public static final int DFLT_SELECTORS_CNT = Math.min(4, Runtime.getRuntime().availableProcessors()); - /** Connection ID meta for session. */ - private static final int CONN_ID_META = GridNioSessionMetaKey.nextUniqueKey(); + /** Connection index meta for session. */ + private static final int CONN_IDX_META = GridNioSessionMetaKey.nextUniqueKey(); /** Message tracker meta for session. */ private static final int TRACKER_META = GridNioSessionMetaKey.nextUniqueKey(); @@ -312,8 +314,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Default socket write timeout. */ public static final long DFLT_SOCK_WRITE_TIMEOUT = 2000; - /** */ - public static final int DFLT_CONN_PER_NODE = 2; + /** Default connections per node. */ + public static final int DFLT_CONN_PER_NODE = 1; /** No-op runnable. */ private static final IgniteRunnable NOOP = new IgniteRunnable() { @@ -362,7 +364,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) { - ConnectionKey connId = ses.meta(CONN_ID_META); + ConnectionKey connId = ses.meta(CONN_IDX_META); if (connId != null) { UUID id = connId.nodeId(); @@ -442,7 +444,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter return; } - final ConnectionKey old = ses.addMeta(CONN_ID_META, connKey); + final ConnectionKey old = ses.addMeta(CONN_IDX_META, connKey); assert old == null; @@ -526,7 +528,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(), - new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut)); + new ConnectClosure(ses, recoveryDesc, rmtNode, connKey, msg0, !hasShmemClient, fut)); if (log.isDebugEnabled()) log.debug("Received incoming connection from remote node " + @@ -558,7 +560,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter else { // The code below causes a race condition between shmem and TCP (see IGNITE-1294) boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(), - new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut)); + new ConnectClosure(ses, recoveryDesc, rmtNode, connKey, msg0, !hasShmemClient, fut)); if (reserved) connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient); @@ -568,7 +570,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } @Override public void onMessage(GridNioSession ses, Message msg) { - ConnectionKey connKey = ses.meta(CONN_ID_META); + ConnectionKey connKey = ses.meta(CONN_IDX_META); if (connKey == null) { assert ses.accepted() : ses; @@ -684,7 +686,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (createClient) { client = new GridTcpNioCommunicationClient(0, ses, log); - addNodeClient(node.id(), 0, client); + addNodeClient(node, 0, client); } return client; @@ -794,10 +796,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** */ private final boolean createClient; + /** */ + private final ConnectionKey connKey; + /** * @param ses Incoming session. * @param recoveryDesc Recovery descriptor. * @param rmtNode Remote node. + * @param connKey Connection key. * @param msg Handshake message. * @param createClient If {@code true} creates NIO communication client.. * @param fut Connect future. @@ -805,12 +811,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter ConnectClosure(GridNioSession ses, GridNioRecoveryDescriptor recoveryDesc, ClusterNode rmtNode, + ConnectionKey connKey, HandshakeMessage msg, boolean createClient, GridFutureAdapter fut) { this.ses = ses; this.recoveryDesc = recoveryDesc; this.rmtNode = rmtNode; + this.connKey = connKey; this.msg = msg; this.createClient = createClient; this.fut = fut; @@ -839,7 +847,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter fut.onDone(); } finally { - clientFuts.remove(rmtNode.id(), fut); + clientFuts.remove(connKey, fut); } } }; @@ -851,7 +859,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter fut.onDone(); } finally { - clientFuts.remove(rmtNode.id(), fut); + clientFuts.remove(connKey, fut); } } } @@ -1105,10 +1113,20 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter return locPortRange; } + /** + * TODO + * + * @param maxConnectionsPerNode + */ public void setConnectionsPerNode(int maxConnectionsPerNode) { this.connectionsPerNode = maxConnectionsPerNode; } + /** + * TODO + * + * @return + */ public int getConnectionsPerNode() { return connectionsPerNode; } @@ -1553,6 +1571,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter .append(", msgsSent=").append(desc.sent()) .append(", msgsAckedByRmt=").append(desc.acked()) .append(", reserveCnt=").append(desc.reserveCount()) + .append(", connected=").append(desc.connected()) + .append(", reserved=").append(desc.reserved()) .append(", descIdHash=").append(System.identityHashCode(desc)) .append(']').append(U.nl()); } @@ -1578,9 +1598,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter GridCommunicationClient[] clients0 = entry.getValue(); for (GridCommunicationClient client : clients0) { - sb.append(" [node=").append(nodeId) - .append(", client=").append(client) - .append(']').append(U.nl()); + if (client != null) { + sb.append(" [node=").append(nodeId) + .append(", client=").append(client) + .append(']').append(U.nl()); + } } } @@ -1606,6 +1628,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assertParameter(msgQueueLimit >= 0, "msgQueueLimit >= 0"); assertParameter(shmemPort > 0 || shmemPort == -1, "shmemPort > 0 || shmemPort == -1"); assertParameter(selectorsCnt > 0, "selectorsCnt > 0"); + assertParameter(connectionsPerNode > 0, "connectionsPerNode > 0"); + assertParameter(connectionsPerNode <= 1024, "connectionsPerNode <= 1024"); if (!failureDetectionTimeoutEnabled()) { assertParameter(reconCnt > 0, "reconnectCnt > 0"); @@ -1691,6 +1715,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter log.debug(configInfo("sockRcvBuf", sockRcvBuf)); log.debug(configInfo("shmemPort", shmemPort)); log.debug(configInfo("msgQueueLimit", msgQueueLimit)); + log.debug(configInfo("connectionsPerNode", connectionsPerNode)); if (failureDetectionTimeoutEnabled()) { log.debug(configInfo("connTimeout", connTimeout)); @@ -1809,7 +1834,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assert formatter != null; - ConnectionKey key = ses.meta(CONN_ID_META); + ConnectionKey key = ses.meta(CONN_IDX_META); return key != null ? formatter.reader(key.nodeId(), msgFactory) : null; } @@ -1824,7 +1849,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assert formatter != null; - ConnectionKey key = ses.meta(CONN_ID_META); + ConnectionKey key = ses.meta(CONN_IDX_META); return key != null ? formatter.writer(key.nodeId()) : null; } @@ -2003,8 +2028,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter // Force closing on stop (safety). for (GridCommunicationClient[] clients0 : clients.values()) { - for (GridCommunicationClient client : clients0) - client.forceClose(); + for (GridCommunicationClient client : clients0) { + if (client != null) + client.forceClose(); + } } // Clear resources. @@ -2031,8 +2058,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter // Force closing. for (GridCommunicationClient[] clients0 : clients.values()) { - for (GridCommunicationClient client : clients0) - client.forceClose(); + for (GridCommunicationClient client : clients0) { + if (client != null) + client.forceClose(); + } } getSpiContext().deregisterPorts(); @@ -2045,8 +2074,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter connectGate.disconnected(reconnectFut); for (GridCommunicationClient[] clients0 : clients.values()) { - for (GridCommunicationClient client : clients0) - client.forceClose(); + for (GridCommunicationClient client : clients0) { + if (client != null) + client.forceClose(); + } } IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut, @@ -2075,11 +2106,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (clients0 != null) { for (GridCommunicationClient client : clients0) { - if (log.isDebugEnabled()) - log.debug("Forcing NIO client close since node has left [nodeId=" + nodeId + - ", client=" + client + ']'); + if (client != null) { + if (log.isDebugEnabled()) + log.debug("Forcing NIO client close since node has left [nodeId=" + nodeId + + ", client=" + client + ']'); - client.forceClose(); + client.forceClose(); + } } } } @@ -2128,12 +2161,49 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter sendMessage0(node, msg, ackC); } + /** */ + private final int idxMode = IgniteSystemProperties.getInteger("CONN_IDX_MODE", 0); + + /** */ + private final ThreadLocal threadConnIdx = new ThreadLocal<>(); + + /** */ + private final AtomicInteger connIdx = new AtomicInteger(); + /** * TODO * @return */ private int connectionIndex() { - return ThreadLocalRandom.current().nextInt(connectionsPerNode); + switch (idxMode) { + case 0: { + return (int)(Thread.currentThread().getId() % connectionsPerNode); + } + + case 1: { + Integer threadIdx = threadConnIdx.get(); + + if (threadIdx != null) + return threadIdx; + + for (;;) { + int idx = connIdx.get(); + int nextIdx = idx == connectionsPerNode - 1 ? 0 : idx + 1; + + if (connIdx.compareAndSet(idx, nextIdx)) { + threadConnIdx.set(idx); + + return idx; + } + } + } + + case 2: + return ThreadLocalRandom.current().nextInt(connectionsPerNode); + + default: + throw new IgniteException("Invalid connection index mode: " + idxMode); + } } /** @@ -2215,48 +2285,48 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter for (;;) { GridCommunicationClient[] curClients = clients.get(nodeId); - if (curClients == null) + if (curClients == null || curClients[rmvClient.connectionIndex()] != rmvClient) return false; - if (curClients[rmvClient.connectionIndex()] == rmvClient) { - GridCommunicationClient[] newClients = Arrays.copyOf(curClients, curClients.length); + GridCommunicationClient[] newClients = Arrays.copyOf(curClients, curClients.length); - newClients[rmvClient.connectionIndex()] = null; + newClients[rmvClient.connectionIndex()] = null; - if (clients.replace(nodeId, curClients, newClients)) - return true; - } - else - return false; + if (clients.replace(nodeId, curClients, newClients)) + return true; } } /** - * @param nodeId Node ID. + * @param node Node. * @param connIdx Connection index. * @param addClient Client to add. */ - private void addNodeClient(UUID nodeId, int connIdx, GridCommunicationClient addClient) { + private void addNodeClient(ClusterNode node, int connIdx, GridCommunicationClient addClient) { + assert connectionsPerNode > 0 : connectionsPerNode; + for (;;) { - GridCommunicationClient[] curClients = clients.get(nodeId); + GridCommunicationClient[] curClients = clients.get(node.id()); - assert curClients == null || curClients[connIdx] == null : "Client already created " + - "[node=" + nodeId + ", client=" + addClient + ", oldClient=" + curClients[connIdx] + ']'; + assert curClients == null || curClients[connIdx] == null : "Client already created [node=" + node.id() + + ", connIdx=" + connIdx + + ", client=" + addClient + + ", oldClient=" + curClients[connIdx] + ']'; GridCommunicationClient[] newClients; if (curClients == null) { - newClients = new GridCommunicationClient[connectionsPerNode]; + newClients = new GridCommunicationClient[useMultipleConnections(node) ? connectionsPerNode : 1]; newClients[connIdx] = addClient; - if (clients.putIfAbsent(nodeId, newClients) == null) + if (clients.putIfAbsent(node.id(), newClients) == null) break; } else { newClients = Arrays.copyOf(curClients, curClients.length); newClients[connIdx] = addClient; - if (clients.replace(nodeId, curClients, newClients)) + if (clients.replace(node.id(), curClients, newClients)) break; } } @@ -2272,6 +2342,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter */ private GridCommunicationClient reserveClient(ClusterNode node, int connIdx) throws IgniteCheckedException { assert node != null; + assert connIdx >= 0 && connIdx < connectionsPerNode : connIdx; UUID nodeId = node.id(); @@ -2301,7 +2372,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter client0 = createNioClient(node, connIdx); if (client0 != null) { - addNodeClient(nodeId, connIdx, client0); + addNodeClient(node, connIdx, client0); if (client0 instanceof GridTcpNioCommunicationClient) { GridTcpNioCommunicationClient tcpClient = ((GridTcpNioCommunicationClient)client0); @@ -2328,7 +2399,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter throw (Error)e; } finally { - clientFuts.remove(nodeId, fut); + clientFuts.remove(connKey, fut); } } else @@ -2525,7 +2596,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter */ private void checkClientQueueSize(GridNioSession ses, int msgQueueSize) { if (slowClientQueueLimit > 0 && msgQueueSize > slowClientQueueLimit) { - ConnectionKey id = ses.meta(CONN_ID_META); + ConnectionKey id = ses.meta(CONN_IDX_META); if (id != null) { ClusterNode node = getSpiContext().node(id.nodeId); @@ -2666,7 +2737,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter try { Map meta = new HashMap<>(); - meta.put(CONN_ID_META, connKey); + meta.put(CONN_IDX_META, connKey); if (isSslEnabled()) { assert sslEngine != null; @@ -3085,8 +3156,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter U.join(commWorker, log); for (GridCommunicationClient[] clients0 : clients.values()) { - for (GridCommunicationClient client : clients0) - client.forceClose(); + for (GridCommunicationClient client : clients0) { + if (client != null) + client.forceClose(); + } } } @@ -3281,7 +3354,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assert formatter != null; - ConnectionKey connKey = ses.meta(CONN_ID_META); + ConnectionKey connKey = ses.meta(CONN_IDX_META); return connKey != null ? formatter.writer(connKey.nodeId()) : null; } @@ -3297,7 +3370,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assert formatter != null; - ConnectionKey connKey = ses.meta(CONN_ID_META); + ConnectionKey connKey = ses.meta(CONN_IDX_META); return connKey != null ? formatter.reader(connKey.nodeId(), msgFactory) : null; } @@ -3384,6 +3457,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter UUID nodeId = e.getKey(); for (GridCommunicationClient client : e.getValue()) { + if (client == null) + continue; + ClusterNode node = getSpiContext().node(nodeId); if (node == null) { @@ -3483,7 +3559,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } /** - * + * @param recoveryDescs Recovery descriptors to cleanup. */ private void cleanupRecovery(ConcurrentMap recoveryDescs) { Set left = null; @@ -3492,9 +3568,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (left != null && left.contains(e.getKey())) continue; - GridNioRecoveryDescriptor recoverySnd = e.getValue(); + GridNioRecoveryDescriptor recoveryDesc = e.getValue(); - if (!recoverySnd.nodeAlive(getSpiContext().node(recoverySnd.node().id()))) { + if (!recoveryDesc.nodeAlive(getSpiContext().node(e.getKey().nodeId()))) { if (left == null) left = new HashSet<>(); @@ -3506,10 +3582,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assert !left.isEmpty(); for (ConnectionKey id : left) { - GridNioRecoveryDescriptor recoverySnd = recoveryDescs.get(id); + GridNioRecoveryDescriptor recoveryDesc = recoveryDescs.get(id); - if (recoverySnd != null && recoverySnd.onNodeLeft()) - recoveryDescs.remove(id); + if (recoveryDesc != null && recoveryDesc.onNodeLeft()) + recoveryDescs.remove(id, recoveryDesc); } } } @@ -3795,7 +3871,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (buf.remaining() < 33) return false; - buf.put(HANDSHAKE_MSG_TYPE); + buf.put(directType()); byte[] bytes = U.uuidToBytes(nodeId); @@ -3867,7 +3943,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * @param rcvCnt Number of received messages. * @param connIdx Connection index. */ - public HandshakeMessage2(UUID nodeId, long connectCnt, long rcvCnt, int connIdx) { + HandshakeMessage2(UUID nodeId, long connectCnt, long rcvCnt, int connIdx) { super(nodeId, connectCnt, rcvCnt); this.connIdx = connIdx; } @@ -3883,6 +3959,32 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + if (!super.writeTo(buf, writer)) + return false; + + if (buf.remaining() < 4) + return false; + + buf.putInt(connIdx); + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + if (!super.readFrom(buf, reader)) + return false; + + if (buf.remaining() < 4) + return false; + + connIdx = buf.getInt(); + + return true; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(HandshakeMessage2.class, this); } @@ -4184,6 +4286,26 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + ConnectionKey key = (ConnectionKey) o; + + return idx == key.idx && nodeId.equals(key.nodeId); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = nodeId.hashCode(); + res = 31 * res + idx; + return res; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(ConnectionKey.class, this); } http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecovery10ConnectionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecovery10ConnectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecovery10ConnectionsTest.java new file mode 100644 index 0000000..30fc9ef --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecovery10ConnectionsTest.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +/** + * + */ +public class IgniteCacheAtomicMessageRecovery10ConnectionsTest extends IgniteCacheAtomicMessageRecoveryTest { + /** {@inheritDoc} */ + @Override protected int connectionsPerNode() { + return 10; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java index 0460a8f..1bfd727 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java @@ -58,6 +58,7 @@ public abstract class IgniteCacheMessageRecoveryAbstractTest extends GridCommonA commSpi.setSocketWriteTimeout(1000); commSpi.setSharedMemoryPort(-1); + commSpi.setConnectionsPerNode(connectionsPerNode()); cfg.setCommunicationSpi(commSpi); @@ -76,6 +77,13 @@ public abstract class IgniteCacheMessageRecoveryAbstractTest extends GridCommonA } /** + * @return Value for {@link TcpCommunicationSpi#setConnectionsPerNode(int)}. + */ + protected int connectionsPerNode() { + return TcpCommunicationSpi.DFLT_CONN_PER_NODE; + } + + /** * @return Cache atomicity mode. */ protected abstract CacheAtomicityMode atomicityMode(); @@ -174,18 +182,22 @@ public abstract class IgniteCacheMessageRecoveryAbstractTest extends GridCommonA static boolean closeSessions(Ignite ignite) throws Exception { TcpCommunicationSpi commSpi = (TcpCommunicationSpi)ignite.configuration().getCommunicationSpi(); - Map clients = U.field(commSpi, "clients"); + Map clients = U.field(commSpi, "clients"); boolean closed = false; - for (GridCommunicationClient client : clients.values()) { - GridTcpNioCommunicationClient client0 = (GridTcpNioCommunicationClient)client; + for (GridCommunicationClient[] clients0 : clients.values()) { + for (GridCommunicationClient client : clients0) { + if (client != null) { + GridTcpNioCommunicationClient client0 = (GridTcpNioCommunicationClient)client; - GridNioSession ses = client0.session(); + GridNioSession ses = client0.session(); - ses.close(); + ses.close(); - closed = true; + closed = true; + } + } } return closed; http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java index 652e47f..deda313 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java @@ -111,14 +111,15 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest { cfg.setConnectorConfiguration(null); TcpCommunicationSpi commSpi = new TcpCommunicationSpi() { - @Override protected GridCommunicationClient createTcpClient(ClusterNode node) throws IgniteCheckedException { + @Override protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) + throws IgniteCheckedException { Map attrs = new HashMap<>(node.attributes()); attrs.remove(createSpiAttributeName(ATTR_PORT)); ((TcpDiscoveryNode)node).setAttributes(attrs); - return super.createTcpClient(node); + return super.createTcpClient(node, connIdx); } }; http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java index c7f7ad4..bd66319 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java @@ -78,6 +78,9 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest clients = U.field(spi, "clients"); + final ConcurrentMap clients = U.field(spi, "clients"); assert GridTestUtils.waitForCondition(new PA() { @Override public boolean apply() { - return clients.isEmpty(); + for (GridCommunicationClient[] clients0 : clients.values()) { + for (GridCommunicationClient client : clients0) { + if (client != null) + return false; + } + } + return true; } }, getTestTimeout()) : "Clients: " + clients; } http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java index fb2dfd7..e0478da 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java @@ -361,6 +361,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest spi.setSocketWriteTimeout(1000); spi.setSocketSendBuffer(512); spi.setSocketReceiveBuffer(512); + spi.setConnectionsPerNode(1); return spi; } http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java index e153fe2..9928d93 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java @@ -397,6 +397,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest