ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [20/50] [abbrv] ignite git commit: ignite-3220 Use pair of connections (in/out) instead of single connection, possibility to configure mutiple connection pairs.
Date Mon, 10 Oct 2016 12:13:16 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/6023539d/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 0a20a2c..f7d51a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -46,6 +46,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLException;
 import org.apache.ignite.Ignite;
@@ -53,6 +54,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.AddressResolver;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -77,6 +79,7 @@ import org.apache.ignite.internal.util.nio.GridDirectParser;
 import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
 import org.apache.ignite.internal.util.nio.GridNioFilter;
 import org.apache.ignite.internal.util.nio.GridNioMessageReaderFactory;
+import org.apache.ignite.internal.util.nio.GridNioMessageTracker;
 import org.apache.ignite.internal.util.nio.GridNioMessageWriterFactory;
 import org.apache.ignite.internal.util.nio.GridNioMetricsListener;
 import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
@@ -177,6 +180,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
  * <li>Node local IP address (see {@link #setLocalAddress(String)})</li>
  * <li>Node local port number (see {@link #setLocalPort(int)})</li>
  * <li>Local port range (see {@link #setLocalPortRange(int)}</li>
+ * <li>Connections per node (see {@link #setConnectionsPerNode(int)})</li>
  * <li>Connection buffer flush frequency (see {@link #setConnectionBufferFlushFrequency(long)})</li>
  * <li>Connection buffer size (see {@link #setConnectionBufferSize(int)})</li>
  * <li>Idle connection timeout (see {@link #setIdleConnectionTimeout(long)})</li>
@@ -281,12 +285,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
     /**
      * Default count of selectors for TCP server equals to
-     * {@code "Math.min(4, Runtime.getRuntime().availableProcessors())"}.
+     * {@code "Math.min(8, Runtime.getRuntime().availableProcessors())"}.
      */
-    public static final int DFLT_SELECTORS_CNT = Runtime.getRuntime().availableProcessors();
+    public static final int DFLT_SELECTORS_CNT = Math.min(8, Runtime.getRuntime().availableProcessors());
 
-    /** Node ID meta for session. */
-    private static final int NODE_ID_META = GridNioSessionMetaKey.nextUniqueKey();
+    /** Connection index meta for session. */
+    private static final int CONN_IDX_META = GridNioSessionMetaKey.nextUniqueKey();
 
     /** Message tracker meta for session. */
     private static final int TRACKER_META = GridNioSessionMetaKey.nextUniqueKey();
@@ -306,6 +310,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Default socket write timeout. */
     public static final long DFLT_SOCK_WRITE_TIMEOUT = 2000;
 
+    /** Default connections per node. */
+    public static final int DFLT_CONN_PER_NODE = 1;
+
     /** No-op runnable. */
     private static final IgniteRunnable NOOP = new IgniteRunnable() {
         @Override public void run() {
@@ -325,6 +332,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** */
     private ConnectGateway connectGate;
 
+    /** */
+    private ConnectionPolicy connPlc;
+
     /** Server listener. */
     private final GridNioServerListener<Message> srvLsnr =
         new GridNioServerListenerAdapter<Message>() {
@@ -350,41 +360,43 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             }
 
             @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
-                UUID id = ses.meta(NODE_ID_META);
+                ConnectionKey connId = ses.meta(CONN_IDX_META);
 
-                if (id != null) {
-                    GridCommunicationClient client = clients.get(id);
+                if (connId != null) {
+                    UUID id = connId.nodeId();
 
-                    if (client instanceof GridTcpNioCommunicationClient &&
-                        ((GridTcpNioCommunicationClient) client).session() == ses) {
-                        client.close();
+                    GridCommunicationClient[] nodeClients = clients.get(id);
 
-                        clients.remove(id, client);
+                    if (nodeClients != null) {
+                        for (GridCommunicationClient client : nodeClients) {
+                            if (client instanceof GridTcpNioCommunicationClient &&
+                                ((GridTcpNioCommunicationClient)client).session() == ses) {
+                                client.close();
+
+                                removeNodeClient(id, client);
+                            }
+                        }
                     }
 
                     if (!stopping) {
-                        boolean reconnect = false;
-
-                        GridNioRecoveryDescriptor recoveryData = ses.outRecoveryDescriptor();
-
-                        if (recoveryData != null) {
-                            if (recoveryData.nodeAlive(getSpiContext().node(id))) {
-                                if (!recoveryData.messagesFutures().isEmpty()) {
-                                    reconnect = true;
+                        GridNioRecoveryDescriptor outDesc = ses.outRecoveryDescriptor();
 
+                        if (outDesc != null) {
+                            if (outDesc.nodeAlive(getSpiContext().node(id))) {
+                                if (!outDesc.messagesFutures().isEmpty()) {
                                     if (log.isDebugEnabled())
                                         log.debug("Session was closed but there are unacknowledged messages, " +
-                                            "will try to reconnect [rmtNode=" + recoveryData.node().id() + ']');
+                                            "will try to reconnect [rmtNode=" + outDesc.node().id() + ']');
+
+                                    DisconnectedSessionInfo disconnectData =
+                                        new DisconnectedSessionInfo(outDesc, connId.connectionIndex());
+
+                                    commWorker.addProcessDisconnectRequest(disconnectData);
                                 }
                             }
                             else
-                                recoveryData.onNodeLeft();
+                                outDesc.onNodeLeft();
                         }
-
-                        DisconnectedSessionInfo disconnectData = new DisconnectedSessionInfo(recoveryData,
-                            reconnect);
-
-                        commWorker.addProcessDisconnectRequest(disconnectData);
                     }
 
                     CommunicationListener<Message> lsnr0 = lsnr;
@@ -401,21 +413,24 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             private void onFirstMessage(GridNioSession ses, Message msg) {
                 UUID sndId;
 
-                if (msg instanceof NodeIdMessage)
-                    sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes, 0);
+                ConnectionKey connKey;
+
+                if (msg instanceof NodeIdMessage) {
+                    sndId = U.bytesToUuid(((NodeIdMessage) msg).nodeIdBytes, 0);
+                    connKey = new ConnectionKey(sndId, 0, -1);
+                }
                 else {
                     assert msg instanceof HandshakeMessage : msg;
 
+                    HandshakeMessage msg0 = (HandshakeMessage)msg;
+
                     sndId = ((HandshakeMessage)msg).nodeId();
+                    connKey = new ConnectionKey(sndId, msg0.connectionIndex(), msg0.connectCount());
                 }
 
                 if (log.isDebugEnabled())
                     log.debug("Remote node ID received: " + sndId);
 
-                final UUID old = ses.addMeta(NODE_ID_META, sndId);
-
-                assert old == null;
-
                 final ClusterNode rmtNode = getSpiContext().node(sndId);
 
                 if (rmtNode == null) {
@@ -427,6 +442,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     return;
                 }
 
+                final ConnectionKey old = ses.addMeta(CONN_IDX_META, connKey);
+
+                assert old == null;
+
                 ClusterNode locNode = getSpiContext().localNode();
 
                 if (ses.remoteAddress() == null)
@@ -436,17 +455,35 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                 HandshakeMessage msg0 = (HandshakeMessage)msg;
 
-                if (useTwoConnections(rmtNode)) {
-                    final GridNioRecoveryDescriptor recoveryDesc = inRecoveryDescriptor(rmtNode);
+                if (useMultipleConnections(rmtNode)) {
+                    final GridNioRecoveryDescriptor recoveryDesc = inRecoveryDescriptor(rmtNode, connKey);
 
-                    boolean reserve = recoveryDesc.tryReserve(msg0.connectCount(),
-                        new ConnectClosureNew(ses, recoveryDesc, rmtNode));
+                    ConnectClosureNew c = new ConnectClosureNew(ses, recoveryDesc, rmtNode);
+
+                    boolean reserve = recoveryDesc.tryReserve(msg0.connectCount(), c);
 
                     if (reserve)
                         connectedNew(recoveryDesc, ses, true);
+                    else {
+                        if (c.failed) {
+                            ses.send(new RecoveryLastReceivedMessage(-1));
+
+                            for (GridNioSession ses0 : nioSrvr.sessions()) {
+                                ConnectionKey key0 = ses0.meta(CONN_IDX_META);
+
+                                if (ses0.accepted() && key0 != null &&
+                                    key0.nodeId().equals(connKey.nodeId()) &&
+                                    key0.connectionIndex() == connKey.connectionIndex() &&
+                                    key0.connectCount() < connKey.connectCount())
+                                    ses0.close();
+                            }
+                        }
+                    }
                 }
                 else {
-                    GridCommunicationClient oldClient = clients.get(sndId);
+                    GridCommunicationClient[] curClients = clients.get(sndId);
+
+                    GridCommunicationClient oldClient = curClients != null ? curClients[0] : null;
 
                     boolean hasShmemClient = false;
 
@@ -470,12 +507,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                     GridFutureAdapter<GridCommunicationClient> fut = new GridFutureAdapter<>();
 
-                    GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(sndId, fut);
+                    GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(connKey, fut);
 
-                    final GridNioRecoveryDescriptor recoveryDesc = inRecoveryDescriptor(rmtNode);
+                    final GridNioRecoveryDescriptor recoveryDesc = inRecoveryDescriptor(rmtNode, connKey);
 
                     if (oldFut == null) {
-                        oldClient = clients.get(sndId);
+                        curClients = clients.get(sndId);
+
+                        oldClient = curClients != null ? curClients[0] : null;
 
                         if (oldClient != null) {
                             if (oldClient instanceof GridTcpNioCommunicationClient) {
@@ -498,7 +537,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         }
 
                         boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
-                            new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut));
+                            new ConnectClosure(ses, recoveryDesc, rmtNode, connKey, msg0, !hasShmemClient, fut));
 
                         if (log.isDebugEnabled())
                             log.debug("Received incoming connection from remote node " +
@@ -512,7 +551,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                                 fut.onDone(client);
                             }
                             finally {
-                                clientFuts.remove(rmtNode.id(), fut);
+                                clientFuts.remove(connKey, fut);
                             }
                         }
                     }
@@ -530,7 +569,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         else {
                             // The code below causes a race condition between shmem and TCP (see IGNITE-1294)
                             boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
-                                new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut));
+                                new ConnectClosure(ses, recoveryDesc, rmtNode, connKey, msg0, !hasShmemClient, fut));
 
                             if (reserved)
                                 connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
@@ -540,9 +579,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             }
 
             @Override public void onMessage(GridNioSession ses, Message msg) {
-                UUID sndId = ses.meta(NODE_ID_META);
+                ConnectionKey connKey = ses.meta(CONN_IDX_META);
 
-                if (sndId == null) {
+                if (connKey == null) {
                     assert ses.accepted() : ses;
 
                     if (!connectGate.tryEnter()) {
@@ -570,9 +609,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         if (recovery != null) {
                             RecoveryLastReceivedMessage msg0 = (RecoveryLastReceivedMessage)msg;
 
-                            if (log.isDebugEnabled())
-                                log.debug("Received recovery acknowledgement [rmtNode=" + sndId +
+                            if (log.isDebugEnabled()) {
+                                log.debug("Received recovery acknowledgement [rmtNode=" + connKey.nodeId() +
+                                    ", connIdx=" + connKey.connectionIndex() +
                                     ", rcvCnt=" + msg0.received() + ']');
+                            }
 
                             recovery.ackReceived(msg0.received());
 
@@ -586,9 +627,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                             long rcvCnt = recovery.onReceived();
 
                             if (rcvCnt % ackSndThreshold == 0) {
-                                if (log.isDebugEnabled())
-                                    log.debug("Send recovery acknowledgement [rmtNode=" + sndId +
+                                if (log.isDebugEnabled()) {
+                                    log.debug("Send recovery acknowledgement [rmtNode=" + connKey.nodeId() +
+                                        ", connIdx=" + connKey.connectionIndex() +
                                         ", rcvCnt=" + rcvCnt + ']');
+                                }
 
                                 nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(rcvCnt));
 
@@ -598,25 +641,25 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     }
 
                     IgniteRunnable c;
-//
-//                    if (msgQueueLimit > 0) {
-//                        GridNioMessageTracker tracker = ses.meta(TRACKER_META);
-//
-//                        if (tracker == null) {
-//                            GridNioMessageTracker old = ses.addMeta(TRACKER_META, tracker =
-//                                new GridNioMessageTracker(ses, msgQueueLimit));
-//
-//                            assert old == null;
-//                        }
-//
-//                        tracker.onMessageReceived();
-//
-//                        c = tracker;
-//                    }
-//                    else
+
+                    if (msgQueueLimit > 0) {
+                        GridNioMessageTracker tracker = ses.meta(TRACKER_META);
+
+                        if (tracker == null) {
+                            GridNioMessageTracker old = ses.addMeta(TRACKER_META, tracker =
+                                new GridNioMessageTracker(ses, msgQueueLimit));
+
+                            assert old == null;
+                        }
+
+                        tracker.onMessageReceived();
+
+                        c = tracker;
+                    }
+                    else
                         c = NOOP;
 
-                    notifyListener(sndId, msg, c);
+                    notifyListener(connKey.nodeId(), msg, c);
                 }
             }
 
@@ -624,7 +667,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
              * @param recovery Recovery descriptor.
              * @param ses Session.
              * @param node Node.
-             * @param rcvCnt Number of received messages..
+             * @param rcvCnt Number of received messages.
              * @param sndRes If {@code true} sends response for recovery handshake.
              * @param createClient If {@code true} creates NIO communication client.
              * @return Client.
@@ -645,17 +688,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 if (sndRes)
                     nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received()));
 
-                recovery.connected();
+                recovery.onConnected();
 
                 GridTcpNioCommunicationClient client = null;
 
                 if (createClient) {
-                    client = new GridTcpNioCommunicationClient(ses, log);
-
-                    GridCommunicationClient oldClient = clients.putIfAbsent(node.id(), client);
+                    client = new GridTcpNioCommunicationClient(0, ses, log);
 
-                    assert oldClient == null : "Client already created [node=" + node + ", client=" + client +
-                        ", oldClient=" + oldClient + ", recoveryDesc=" + recovery + ']';
+                    addNodeClient(node, 0, client);
                 }
 
                 return client;
@@ -675,7 +715,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 if (sndRes)
                     nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received()));
 
-                recovery.connected();
+                recovery.onConnected();
             }
 
             /**
@@ -694,6 +734,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 /** */
                 private final ClusterNode rmtNode;
 
+                /** */
+                private boolean failed;
+
                 /**
                  * @param ses Incoming session.
                  * @param recoveryDesc Recovery descriptor.
@@ -709,6 +752,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                 /** {@inheritDoc} */
                 @Override public void apply(Boolean success) {
+                    failed = !success;
+
                     if (success) {
                         IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() {
                             @Override public void apply(IgniteInternalFuture<?> msgFut) {
@@ -760,10 +805,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 /** */
                 private final boolean createClient;
 
+                /** */
+                private final ConnectionKey connKey;
+
                 /**
                  * @param ses Incoming session.
                  * @param recoveryDesc Recovery descriptor.
                  * @param rmtNode Remote node.
+                 * @param connKey Connection key.
                  * @param msg Handshake message.
                  * @param createClient If {@code true} creates NIO communication client..
                  * @param fut Connect future.
@@ -771,12 +820,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 ConnectClosure(GridNioSession ses,
                     GridNioRecoveryDescriptor recoveryDesc,
                     ClusterNode rmtNode,
+                    ConnectionKey connKey,
                     HandshakeMessage msg,
                     boolean createClient,
                     GridFutureAdapter<GridCommunicationClient> fut) {
                     this.ses = ses;
                     this.recoveryDesc = recoveryDesc;
                     this.rmtNode = rmtNode;
+                    this.connKey = connKey;
                     this.msg = msg;
                     this.createClient = createClient;
                     this.fut = fut;
@@ -805,7 +856,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                                     fut.onDone();
                                 }
                                 finally {
-                                    clientFuts.remove(rmtNode.id(), fut);
+                                    clientFuts.remove(connKey, fut);
                                 }
                             }
                         };
@@ -817,7 +868,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                             fut.onDone();
                         }
                         finally {
-                            clientFuts.remove(rmtNode.id(), fut);
+                            clientFuts.remove(connKey, fut);
                         }
                     }
                 }
@@ -880,6 +931,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Shared memory server. */
     private IpcSharedMemoryServerEndpoint shmemSrv;
 
+    /** */
+    private int connectionsPerNode = DFLT_CONN_PER_NODE;
+
     /** {@code TCP_NODELAY} option value for created sockets. */
     private boolean tcpNoDelay = DFLT_TCP_NODELAY;
 
@@ -902,7 +956,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     private final Collection<ShmemWorker> shmemWorkers = new ConcurrentLinkedDeque8<>();
 
     /** Clients. */
-    private final ConcurrentMap<UUID, GridCommunicationClient> clients = GridConcurrentFactory.newMap();
+    private final ConcurrentMap<UUID, GridCommunicationClient[]> clients = GridConcurrentFactory.newMap();
 
     /** SPI listener. */
     private volatile CommunicationListener<Message> lsnr;
@@ -949,17 +1003,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     };
 
     /** Client connect futures. */
-    private final ConcurrentMap<UUID, GridFutureAdapter<GridCommunicationClient>> clientFuts =
+    private final ConcurrentMap<ConnectionKey, GridFutureAdapter<GridCommunicationClient>> clientFuts =
         GridConcurrentFactory.newMap();
 
     /** */
-    private final ConcurrentMap<ClientKey, GridNioRecoveryDescriptor> recoveryDescs = GridConcurrentFactory.newMap();
+    private final ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> recoveryDescs = GridConcurrentFactory.newMap();
 
     /** */
-    private final ConcurrentMap<ClientKey, GridNioRecoveryDescriptor> outRecDescs = GridConcurrentFactory.newMap();
+    private final ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> outRecDescs = GridConcurrentFactory.newMap();
 
     /** */
-    private final ConcurrentMap<ClientKey, GridNioRecoveryDescriptor> inRecDescs = GridConcurrentFactory.newMap();
+    private final ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> inRecDescs = GridConcurrentFactory.newMap();
 
     /** Discovery listener. */
     private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
@@ -1069,6 +1123,24 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     }
 
     /**
+     * TODO
+     *
+     * @param maxConnectionsPerNode Number of connections per node.
+     */
+    public void setConnectionsPerNode(int maxConnectionsPerNode) {
+        this.connectionsPerNode = maxConnectionsPerNode;
+    }
+
+    /**
+     * TODO
+     *
+     * @return Number of connections per node.
+     */
+    public int getConnectionsPerNode() {
+        return connectionsPerNode;
+    }
+
+    /**
      * Sets local port to accept shared memory connections.
      * <p>
      * If set to {@code -1} shared memory communication will be disabled.
@@ -1488,7 +1560,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         if (log != null) {
             StringBuilder sb = new StringBuilder("Communication SPI recovery descriptors: ").append(U.nl());
 
-            for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> entry : recoveryDescs.entrySet()) {
+            for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : recoveryDescs.entrySet()) {
                 GridNioRecoveryDescriptor desc = entry.getValue();
 
                 sb.append("    [key=").append(entry.getKey())
@@ -1501,34 +1573,46 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     .append(']').append(U.nl());
             }
 
-            for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> entry : outRecDescs.entrySet()) {
+            for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : outRecDescs.entrySet()) {
                 GridNioRecoveryDescriptor desc = entry.getValue();
 
                 sb.append("    [key=").append(entry.getKey())
                     .append(", msgsSent=").append(desc.sent())
                     .append(", msgsAckedByRmt=").append(desc.acked())
                     .append(", reserveCnt=").append(desc.reserveCount())
+                    .append(", connected=").append(desc.connected())
+                    .append(", reserved=").append(desc.reserved())
                     .append(", descIdHash=").append(System.identityHashCode(desc))
                     .append(']').append(U.nl());
             }
 
-            for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> entry : inRecDescs.entrySet()) {
+            for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : inRecDescs.entrySet()) {
                 GridNioRecoveryDescriptor desc = entry.getValue();
 
                 sb.append("    [key=").append(entry.getKey())
                     .append(", msgsRcvd=").append(desc.received())
                     .append(", lastAcked=").append(desc.lastAcknowledged())
                     .append(", reserveCnt=").append(desc.reserveCount())
+                    .append(", connected=").append(desc.connected())
+                    .append(", reserved=").append(desc.reserved())
+                    .append(", handshakeIdx=").append(desc.handshakeIndex())
                     .append(", descIdHash=").append(System.identityHashCode(desc))
                     .append(']').append(U.nl());
             }
 
             sb.append("Communication SPI clients: ").append(U.nl());
 
-            for (Map.Entry<UUID, GridCommunicationClient> entry : clients.entrySet()) {
-                sb.append("    [node=").append(entry.getKey())
-                    .append(", client=").append(entry.getValue())
-                    .append(']').append(U.nl());
+            for (Map.Entry<UUID, GridCommunicationClient[]> entry : clients.entrySet()) {
+                UUID nodeId = entry.getKey();
+                GridCommunicationClient[] clients0 = entry.getValue();
+
+                for (GridCommunicationClient client : clients0) {
+                    if (client != null) {
+                        sb.append("    [node=").append(nodeId)
+                            .append(", client=").append(client)
+                            .append(']').append(U.nl());
+                    }
+                }
             }
 
             U.warn(log, sb.toString());
@@ -1540,6 +1624,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             nioSrvr.dumpStats();
     }
 
+    /** */
+    private final ThreadLocal<Integer> threadConnIdx = new ThreadLocal<>();
+
+    /** */
+    private final AtomicInteger connIdx = new AtomicInteger();
+
     /** {@inheritDoc} */
     @Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException {
         initFailureDetectionTimeout();
@@ -1553,6 +1643,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         assertParameter(msgQueueLimit >= 0, "msgQueueLimit >= 0");
         assertParameter(shmemPort > 0 || shmemPort == -1, "shmemPort > 0 || shmemPort == -1");
         assertParameter(selectorsCnt > 0, "selectorsCnt > 0");
+        assertParameter(connectionsPerNode > 0, "connectionsPerNode > 0");
+        assertParameter(connectionsPerNode <= 1024, "connectionsPerNode <= 1024");
 
         if (!failureDetectionTimeoutEnabled()) {
             assertParameter(reconCnt > 0, "reconnectCnt > 0");
@@ -1572,6 +1664,53 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 "Specified 'unackedMsgsBufSize' is too low, it should be at least 'ackSndThreshold * 5'.");
         }
 
+        if (connectionsPerNode > 1) {
+            int idxMode = IgniteSystemProperties.getInteger("CONN_IDX_MODE", 0);
+
+            switch (idxMode) {
+                case 0: {
+                    connPlc = new ConnectionPolicy() {
+                        @Override public int connectionIndex() {
+                            return (int)(Thread.currentThread().getId() % connectionsPerNode);
+                        }
+                    };
+
+                    break;
+                }
+
+                case 1: {
+                    connPlc = new ConnectionPolicy() {
+                        @Override public int connectionIndex() {
+                            Integer threadIdx = threadConnIdx.get();
+
+                            if (threadIdx != null)
+                                return threadIdx;
+
+                            for (;;) {
+                                int idx = connIdx.get();
+                                int nextIdx = idx == connectionsPerNode - 1 ? 0 : idx + 1;
+
+                                if (connIdx.compareAndSet(idx, nextIdx)) {
+                                    threadConnIdx.set(idx);
+
+                                    return idx;
+                                }
+                            }
+                        }
+                    };
+
+                    break;
+                }
+            }
+        }
+        else {
+            connPlc = new ConnectionPolicy() {
+                @Override public int connectionIndex() {
+                    return 0;
+                }
+            };
+        }
+
         try {
             locHost = U.resolveLocalHost(locAddr);
         }
@@ -1638,6 +1777,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             log.debug(configInfo("sockRcvBuf", sockRcvBuf));
             log.debug(configInfo("shmemPort", shmemPort));
             log.debug(configInfo("msgQueueLimit", msgQueueLimit));
+            log.debug(configInfo("connectionsPerNode", connectionsPerNode));
 
             if (failureDetectionTimeoutEnabled()) {
                 log.debug(configInfo("connTimeout", connTimeout));
@@ -1756,9 +1896,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                         assert formatter != null;
 
-                        UUID rmtNodeId = ses.meta(NODE_ID_META);
+                        ConnectionKey key = ses.meta(CONN_IDX_META);
 
-                        return rmtNodeId != null ? formatter.reader(rmtNodeId, msgFactory) : null;
+                        return key != null ? formatter.reader(key.nodeId(), msgFactory) : null;
                     }
                 };
 
@@ -1771,9 +1911,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                         assert formatter != null;
 
-                        UUID rmtNodeId = ses.meta(NODE_ID_META);
+                        ConnectionKey key = ses.meta(CONN_IDX_META);
 
-                        return rmtNodeId != null ? formatter.writer(rmtNodeId) : null;
+                        return key != null ? formatter.writer(key.nodeId()) : null;
                     }
                 };
 
@@ -1830,6 +1970,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         .logger(log)
                         .selectorCount(selectorsCnt)
                         .gridName(gridName)
+                        .serverName("tcp-comm")
                         .tcpNoDelay(tcpNoDelay)
                         .directBuffer(directBuf)
                         .byteOrder(ByteOrder.nativeOrder())
@@ -1949,8 +2090,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         shmemWorkers.clear();
 
         // Force closing on stop (safety).
-        for (GridCommunicationClient client : clients.values())
-            client.forceClose();
+        for (GridCommunicationClient[] clients0 : clients.values()) {
+            for (GridCommunicationClient client : clients0) {
+                if (client != null)
+                    client.forceClose();
+            }
+        }
 
         // Clear resources.
         nioSrvr = null;
@@ -1975,8 +2120,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             connectGate.stopped();
 
         // Force closing.
-        for (GridCommunicationClient client : clients.values())
-            client.forceClose();
+        for (GridCommunicationClient[] clients0 : clients.values()) {
+            for (GridCommunicationClient client : clients0) {
+                if (client != null)
+                    client.forceClose();
+            }
+        }
 
         getSpiContext().deregisterPorts();
 
@@ -1987,8 +2136,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     @Override public void onClientDisconnected(IgniteFuture<?> reconnectFut) {
         connectGate.disconnected(reconnectFut);
 
-        for (GridCommunicationClient client : clients.values())
-            client.forceClose();
+        for (GridCommunicationClient[] clients0 : clients.values()) {
+            for (GridCommunicationClient client : clients0) {
+                if (client != null)
+                    client.forceClose();
+            }
+        }
 
         IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut,
             "Failed to connect client node disconnected.");
@@ -2012,16 +2165,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     void onNodeLeft(UUID nodeId) {
         assert nodeId != null;
 
-        GridCommunicationClient client = clients.get(nodeId);
-
-        if (client != null) {
-            if (log.isDebugEnabled())
-                log.debug("Forcing NIO client close since node has left [nodeId=" + nodeId +
-                    ", client=" + client + ']');
+        GridCommunicationClient[] clients0 = clients.remove(nodeId);
 
-            client.forceClose();
+        if (clients0 != null) {
+            for (GridCommunicationClient client : clients0) {
+                if (client != null) {
+                    if (log.isDebugEnabled())
+                        log.debug("Forcing NIO client close since node has left [nodeId=" + nodeId +
+                            ", client=" + client + ']');
 
-            clients.remove(nodeId, client);
+                    client.forceClose();
+                }
+            }
         }
     }
 
@@ -2096,11 +2251,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         else {
             GridCommunicationClient client = null;
 
+            int connIdx = useMultipleConnections(node) ? connPlc.connectionIndex() : 0;
+
             try {
                 boolean retry;
 
                 do {
-                    client = reserveClient(node);
+                    client = reserveClient(node, connIdx);
 
                     UUID nodeId = null;
 
@@ -2114,7 +2271,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     if (!retry)
                         sentMsgsCnt.increment();
                     else {
-                        clients.remove(node.id(), client);
+                        removeNodeClient(node.id(), client);
 
                         ClusterNode node0 = getSpiContext().node(node.id());
 
@@ -2138,19 +2295,79 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     }
 
     /**
+     * @param nodeId Node ID.
+     * @param rmvClient Client to remove.
+     * @return {@code True} if client was removed.
+     */
+    private boolean removeNodeClient(UUID nodeId, GridCommunicationClient rmvClient) {
+        for (;;) {
+            GridCommunicationClient[] curClients = clients.get(nodeId);
+
+            if (curClients == null || curClients[rmvClient.connectionIndex()] != rmvClient)
+                return false;
+
+            GridCommunicationClient[] newClients = Arrays.copyOf(curClients, curClients.length);
+
+            newClients[rmvClient.connectionIndex()] = null;
+
+            if (clients.replace(nodeId, curClients, newClients))
+                return true;
+        }
+    }
+
+    /**
+     * @param node Node.
+     * @param connIdx Connection index.
+     * @param addClient Client to add.
+     */
+    private void addNodeClient(ClusterNode node, int connIdx, GridCommunicationClient addClient) {
+        assert connectionsPerNode > 0 : connectionsPerNode;
+
+        for (;;) {
+            GridCommunicationClient[] curClients = clients.get(node.id());
+
+            assert curClients == null || curClients[connIdx] == null : "Client already created [node=" + node.id() +
+                ", connIdx=" + connIdx +
+                ", client=" + addClient +
+                ", oldClient=" + curClients[connIdx] + ']';
+
+            GridCommunicationClient[] newClients;
+
+            if (curClients == null) {
+                newClients = new GridCommunicationClient[useMultipleConnections(node) ? connectionsPerNode : 1];
+                newClients[connIdx] = addClient;
+
+                if (clients.putIfAbsent(node.id(), newClients) == null)
+                    break;
+            }
+            else {
+                newClients = Arrays.copyOf(curClients, curClients.length);
+                newClients[connIdx] = addClient;
+
+                if (clients.replace(node.id(), curClients, newClients))
+                    break;
+            }
+        }
+    }
+
+    /**
      * Returns existing or just created client to node.
      *
      * @param node Node to which client should be open.
+     * @param connIdx Connection index.
      * @return The existing or just created client.
      * @throws IgniteCheckedException Thrown if any exception occurs.
      */
-    private GridCommunicationClient reserveClient(ClusterNode node) throws IgniteCheckedException {
+    private GridCommunicationClient reserveClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
         assert node != null;
+        assert connIdx >= 0 && connIdx < connectionsPerNode : connIdx;
 
         UUID nodeId = node.id();
 
         while (true) {
-            GridCommunicationClient client = clients.get(nodeId);
+            GridCommunicationClient[] curClients = clients.get(nodeId);
+
+            GridCommunicationClient client = curClients != null ? curClients[connIdx] : null;
 
             if (client == null) {
                 if (stopping)
@@ -2159,25 +2376,26 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 // Do not allow concurrent connects.
                 GridFutureAdapter<GridCommunicationClient> fut = new ConnectFuture();
 
-                GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(nodeId, fut);
+                ConnectionKey connKey = new ConnectionKey(nodeId, connIdx, -1);
+
+                GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(connKey, fut);
 
                 if (oldFut == null) {
                     try {
-                        GridCommunicationClient client0 = clients.get(nodeId);
+                        GridCommunicationClient[] curClients0 = clients.get(nodeId);
+
+                        GridCommunicationClient client0 = curClients0 != null ? curClients0[connIdx] : null;
 
                         if (client0 == null) {
-                            client0 = createNioClient(node);
+                            client0 = createNioClient(node, connIdx);
 
                             if (client0 != null) {
-                                GridCommunicationClient old = clients.put(nodeId, client0);
-
-                                assert old == null : "Client already created " +
-                                    "[node=" + node + ", client=" + client0 + ", oldClient=" + old + ']';
+                                addNodeClient(node, connIdx, client0);
 
                                 if (client0 instanceof GridTcpNioCommunicationClient) {
                                     GridTcpNioCommunicationClient tcpClient = ((GridTcpNioCommunicationClient)client0);
 
-                                    if (tcpClient.session().closeTime() > 0 && clients.remove(nodeId, client0)) {
+                                    if (tcpClient.session().closeTime() > 0 && removeNodeClient(nodeId, client0)) {
                                         if (log.isDebugEnabled())
                                             log.debug("Session was closed after client creation, will retry " +
                                                 "[node=" + node + ", client=" + client0 + ']');
@@ -2199,7 +2417,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                             throw (Error)e;
                     }
                     finally {
-                        clientFuts.remove(nodeId, fut);
+                        clientFuts.remove(connKey, fut);
                     }
                 }
                 else
@@ -2211,7 +2429,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     continue;
 
                 if (getSpiContext().node(nodeId) == null) {
-                    if (clients.remove(nodeId, client))
+                    if (removeNodeClient(nodeId, client))
                         client.forceClose();
 
                     throw new IgniteSpiException("Destination node is not in topology: " + node.id());
@@ -2222,16 +2440,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 return client;
             else
                 // Client has just been closed by idle worker. Help it and try again.
-                clients.remove(nodeId, client);
+                removeNodeClient(nodeId, client);
         }
     }
 
     /**
      * @param node Node to create client for.
+     * @param connIdx Connection index.
      * @return Client.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable protected GridCommunicationClient createNioClient(ClusterNode node) throws IgniteCheckedException {
+    @Nullable private GridCommunicationClient createNioClient(ClusterNode node, int connIdx)
+        throws IgniteCheckedException {
         assert node != null;
 
         Integer shmemPort = node.attribute(createSpiAttributeName(ATTR_SHMEM_PORT));
@@ -2250,6 +2470,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             try {
                 GridCommunicationClient client = createShmemClient(
                     node,
+                    connIdx,
                     shmemPort);
 
                 if (log.isDebugEnabled())
@@ -2272,7 +2493,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         connectGate.enter();
 
         try {
-            GridCommunicationClient client = createTcpClient(node);
+            GridCommunicationClient client = createTcpClient(node, connIdx);
 
             if (log.isDebugEnabled())
                 log.debug("TCP client created: " + client);
@@ -2287,10 +2508,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /**
      * @param node Node.
      * @param port Port.
+     * @param connIdx Connection index.
      * @return Client.
      * @throws IgniteCheckedException If failed.
      */
     @Nullable protected GridCommunicationClient createShmemClient(ClusterNode node,
+        int connIdx,
         Integer port) throws IgniteCheckedException {
         int attempt = 1;
 
@@ -2304,7 +2527,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             GridCommunicationClient client;
 
             try {
-                client = new GridShmemCommunicationClient(metricsLsnr,
+                client = new GridShmemCommunicationClient(
+                    connIdx,
+                    metricsLsnr,
                     port,
                     timeoutHelper.nextTimeoutChunk(connTimeout),
                     log,
@@ -2325,7 +2550,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             }
 
             try {
-                safeHandshake(client, null, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0), null);
+                safeHandshake(client,
+                    null,
+                    node.id(),
+                    timeoutHelper.nextTimeoutChunk(connTimeout0),
+                    null,
+                    null);
             }
             catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) {
                 client.forceClose();
@@ -2384,10 +2614,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      */
     private void checkClientQueueSize(GridNioSession ses, int msgQueueSize) {
         if (slowClientQueueLimit > 0 && msgQueueSize > slowClientQueueLimit) {
-            UUID id = ses.meta(NODE_ID_META);
+            ConnectionKey id = ses.meta(CONN_IDX_META);
 
             if (id != null) {
-                ClusterNode node = getSpiContext().node(id);
+                ClusterNode node = getSpiContext().node(id.nodeId);
 
                 if (node != null && node.isClient()) {
                     String msg = "Client node outbound message queue size exceeded slowClientQueueLimit, " +
@@ -2401,7 +2631,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         log,
                         msg);
 
-                    getSpiContext().failNode(id, msg);
+                    getSpiContext().failNode(id.nodeId(), msg);
                 }
             }
         }
@@ -2411,10 +2641,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      * Establish TCP connection to remote node and returns client.
      *
      * @param node Remote node.
+     * @param connIdx Connection index.
      * @return Client.
      * @throws IgniteCheckedException If failed.
      */
-    protected GridCommunicationClient createTcpClient(ClusterNode node) throws IgniteCheckedException {
+    protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
         Collection<String> rmtAddrs0 = node.attribute(createSpiAttributeName(ATTR_ADDRS));
         Collection<String> rmtHostNames0 = node.attribute(createSpiAttributeName(ATTR_HOST_NAMES));
         Integer boundPort = node.attribute(createSpiAttributeName(ATTR_PORT));
@@ -2482,7 +2713,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                             "(node left topology): " + node);
                     }
 
-                    GridNioRecoveryDescriptor recoveryDesc = outRecoveryDescriptor(node);
+                    ConnectionKey connKey = new ConnectionKey(node.id(), connIdx, -1);
+
+                    GridNioRecoveryDescriptor recoveryDesc = outRecoveryDescriptor(node, connKey);
 
                     if (!recoveryDesc.reserve()) {
                         U.closeQuiet(ch);
@@ -2503,8 +2736,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                             sslEngine.setUseClientMode(true);
                         }
 
-                        rcvCnt = safeHandshake(ch, recoveryDesc, node.id(),
-                            timeoutHelper.nextTimeoutChunk(connTimeout0), sslEngine);
+                        Integer handshakeConnIdx = useMultipleConnections(node) ? connIdx : null;
+
+                        rcvCnt = safeHandshake(ch,
+                            recoveryDesc,
+                            node.id(),
+                            timeoutHelper.nextTimeoutChunk(connTimeout0),
+                            sslEngine,
+                            handshakeConnIdx);
 
                         if (rcvCnt == -1)
                             return null;
@@ -2517,7 +2756,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     try {
                         Map<Integer, Object> meta = new HashMap<>();
 
-                        meta.put(NODE_ID_META, node.id());
+                        meta.put(CONN_IDX_META, connKey);
 
                         if (isSslEnabled()) {
                             assert sslEngine != null;
@@ -2533,7 +2772,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                         GridNioSession ses = nioSrvr.createSession(ch, meta).get();
 
-                        client = new GridTcpNioCommunicationClient(ses, log);
+                        client = new GridTcpNioCommunicationClient(connIdx, ses, log);
 
                         conn = true;
                     }
@@ -2677,6 +2916,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      * @param rmtNodeId Remote node.
      * @param timeout Timeout for handshake.
      * @param ssl SSL engine if used cryptography, otherwise {@code null}.
+     * @param handshakeConnIdx Non null connection index if need send it in handshake.
      * @throws IgniteCheckedException If handshake failed or wasn't completed withing timeout.
      * @return Handshake response.
      */
@@ -2686,7 +2926,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         @Nullable GridNioRecoveryDescriptor recovery,
         UUID rmtNodeId,
         long timeout,
-        @Nullable SSLEngine ssl
+        @Nullable SSLEngine ssl,
+        @Nullable Integer handshakeConnIdx
     ) throws IgniteCheckedException {
         HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout);
 
@@ -2766,14 +3007,28 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                             "fully initialized [isStopping=" + getSpiContext().isStopping() + ']');
 
                     if (recovery != null) {
-                        HandshakeMessage msg = new HandshakeMessage(locNode.id(),
-                            recovery.incrementConnectCount(),
-                            recovery.received());
+                        HandshakeMessage msg;
+
+                        int msgSize = 33;
+
+                        if (handshakeConnIdx != null) {
+                            msg = new HandshakeMessage2(locNode.id(),
+                                recovery.incrementConnectCount(),
+                                recovery.received(),
+                                handshakeConnIdx);
+
+                            msgSize += 4;
+                        }
+                        else {
+                            msg = new HandshakeMessage(locNode.id(),
+                                recovery.incrementConnectCount(),
+                                recovery.received());
+                        }
 
                         if (log.isDebugEnabled())
                             log.debug("Write handshake message [rmtNode=" + rmtNodeId + ", msg=" + msg + ']');
 
-                        buf = ByteBuffer.allocate(33);
+                        buf = ByteBuffer.allocate(msgSize);
 
                         buf.order(ByteOrder.nativeOrder());
 
@@ -2920,42 +3175,57 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
         U.join(commWorker, log);
 
-        for (GridCommunicationClient client : clients.values())
-            client.forceClose();
+        for (GridCommunicationClient[] clients0 : clients.values()) {
+            for (GridCommunicationClient client : clients0) {
+                if (client != null)
+                    client.forceClose();
+            }
+        }
     }
 
-    private GridNioRecoveryDescriptor outRecoveryDescriptor(ClusterNode node) {
-        if (useTwoConnections(node))
-            return recoveryDescriptor(outRecDescs, node);
+    /**
+     * @param node Node.
+     * @param key Connection key.
+     * @return Recovery descriptor for outgoing connection.
+     */
+    private GridNioRecoveryDescriptor outRecoveryDescriptor(ClusterNode node, ConnectionKey key) {
+        if (useMultipleConnections(node))
+            return recoveryDescriptor(outRecDescs, node, key);
         else
-            return recoveryDescriptor(recoveryDescs, node);
+            return recoveryDescriptor(recoveryDescs, node, key);
     }
 
-    private GridNioRecoveryDescriptor inRecoveryDescriptor(ClusterNode node) {
-        if (useTwoConnections(node))
-            return recoveryDescriptor(inRecDescs, node);
+    /**
+     * @param node Node.
+     * @param key Connection key.
+     * @return Recovery descriptor for incoming connection.
+     */
+    private GridNioRecoveryDescriptor inRecoveryDescriptor(ClusterNode node, ConnectionKey key) {
+        if (useMultipleConnections(node))
+            return recoveryDescriptor(inRecDescs, node, key);
         else
-            return recoveryDescriptor(recoveryDescs, node);
+            return recoveryDescriptor(recoveryDescs, node, key);
     }
 
     /**
      * @param node Node.
-     * @return {@code True} if given node supports two connections per-node for communication.
+     * @return {@code True} if given node supports multiple connections per-node for communication.
      */
-    private boolean useTwoConnections(ClusterNode node) {
+    private boolean useMultipleConnections(ClusterNode node) {
         return true;
     }
 
     /**
+     * @param recoveryDescs Descriptors map.
      * @param node Node.
+     * @param key Connection key.
      * @return Recovery receive data for given node.
      */
     private GridNioRecoveryDescriptor recoveryDescriptor(
-        ConcurrentMap<ClientKey, GridNioRecoveryDescriptor> recoveryDescs,
-        ClusterNode node) {
-        ClientKey id = new ClientKey(node.id(), node.order());
-
-        GridNioRecoveryDescriptor recovery = recoveryDescs.get(id);
+        ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> recoveryDescs,
+        ClusterNode node,
+        ConnectionKey key) {
+        GridNioRecoveryDescriptor recovery = recoveryDescs.get(key);
 
         if (recovery == null) {
             int maxSize = Math.max(msgQueueLimit, ackSndThreshold);
@@ -2963,7 +3233,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             int queueLimit = unackedMsgsBufSize != 0 ? unackedMsgsBufSize : (maxSize * 5);
 
             GridNioRecoveryDescriptor old =
-                recoveryDescs.putIfAbsent(id, recovery = new GridNioRecoveryDescriptor(queueLimit, node, log));
+                recoveryDescs.putIfAbsent(key, recovery = new GridNioRecoveryDescriptor(queueLimit, node, log));
 
             if (old != null)
                 recovery = old;
@@ -3005,54 +3275,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         return S.toString(TcpCommunicationSpi.class, this);
     }
 
-    /**
-     *
-     */
-    private static class ClientKey {
-        /** */
-        private UUID nodeId;
-
-        /** */
-        private long order;
-
-        /**
-         * @param nodeId Node ID.
-         * @param order Node order.
-         */
-        private ClientKey(UUID nodeId, long order) {
-            this.nodeId = nodeId;
-            this.order = order;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object obj) {
-            if (this == obj)
-                return true;
-
-            if (obj == null || getClass() != obj.getClass())
-                return false;
-
-            ClientKey other = (ClientKey)obj;
-
-            return order == other.order && nodeId.equals(other.nodeId);
-
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            int res = nodeId.hashCode();
-
-            res = 31 * res + (int)(order ^ (order >>> 32));
-
-            return res;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(ClientKey.class, this);
-        }
-    }
-
     /** Internal exception class for proper timeout handling. */
     private static class HandshakeTimeoutException extends IgniteCheckedException {
         /** */
@@ -3152,9 +3374,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                         assert formatter != null;
 
-                        UUID rmtNodeId = ses.meta(NODE_ID_META);
+                        ConnectionKey connKey = ses.meta(CONN_IDX_META);
 
-                        return rmtNodeId != null ? formatter.writer(rmtNodeId) : null;
+                        return connKey != null ? formatter.writer(connKey.nodeId()) : null;
                     }
                 };
 
@@ -3168,9 +3390,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                         assert formatter != null;
 
-                        UUID rmtNodeId = ses.meta(NODE_ID_META);
+                        ConnectionKey connKey = ses.meta(CONN_IDX_META);
 
-                        return rmtNodeId != null ? formatter.reader(rmtNodeId, msgFactory) : null;
+                        return connKey != null ? formatter.reader(connKey.nodeId(), msgFactory) : null;
                     }
                 };
 
@@ -3251,72 +3473,75 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         private void processIdle() {
             cleanupRecovery();
 
-            for (Map.Entry<UUID, GridCommunicationClient> e : clients.entrySet()) {
+            for (Map.Entry<UUID, GridCommunicationClient[]> e : clients.entrySet()) {
                 UUID nodeId = e.getKey();
 
-                GridCommunicationClient client = e.getValue();
+                for (GridCommunicationClient client : e.getValue()) {
+                    if (client == null)
+                        continue;
 
-                ClusterNode node = getSpiContext().node(nodeId);
+                    ClusterNode node = getSpiContext().node(nodeId);
 
-                if (node == null) {
-                    if (log.isDebugEnabled())
-                        log.debug("Forcing close of non-existent node connection: " + nodeId);
+                    if (node == null) {
+                        if (log.isDebugEnabled())
+                            log.debug("Forcing close of non-existent node connection: " + nodeId);
 
-                    client.forceClose();
+                        client.forceClose();
 
-                    clients.remove(nodeId, client);
+                        removeNodeClient(nodeId, client);
 
-                    continue;
-                }
+                        continue;
+                    }
 
-                GridNioRecoveryDescriptor recovery = null;
+                    GridNioRecoveryDescriptor recovery = null;
 
-                if (!useTwoConnections(node) && client instanceof GridTcpNioCommunicationClient) {
-                    recovery = recoveryDescs.get(new ClientKey(node.id(), node.order()));
+                    if (!useMultipleConnections(node) && client instanceof GridTcpNioCommunicationClient) {
+                        recovery = recoveryDescs.get(new ConnectionKey(node.id(), client.connectionIndex(), -1));
 
-                    if (recovery != null && recovery.lastAcknowledged() != recovery.received()) {
-                        RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received());
+                        if (recovery != null && recovery.lastAcknowledged() != recovery.received()) {
+                            RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received());
 
-                        if (log.isDebugEnabled())
-                            log.debug("Send recovery acknowledgement on timeout [rmtNode=" + nodeId +
-                                ", rcvCnt=" + msg.received() + ']');
+                            if (log.isDebugEnabled())
+                                log.debug("Send recovery acknowledgement on timeout [rmtNode=" + nodeId +
+                                    ", rcvCnt=" + msg.received() + ']');
 
-                        nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg);
+                            nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg);
 
-                        recovery.lastAcknowledged(msg.received());
+                            recovery.lastAcknowledged(msg.received());
 
-                        continue;
+                            continue;
+                        }
                     }
-                }
 
-                long idleTime = client.getIdleTime();
+                    long idleTime = client.getIdleTime();
 
-                if (idleTime >= idleConnTimeout) {
-                    if (recovery == null && useTwoConnections(node))
-                        recovery = outRecDescs.get(new ClientKey(node.id(), node.order()));
+                    if (idleTime >= idleConnTimeout) {
+                        if (recovery == null && useMultipleConnections(node))
+                            recovery = outRecDescs.get(new ConnectionKey(node.id(), client.connectionIndex(), -1));
 
-                    if (recovery != null &&
-                        recovery.nodeAlive(getSpiContext().node(nodeId)) &&
-                        !recovery.messagesFutures().isEmpty()) {
-                        if (log.isDebugEnabled())
-                            log.debug("Node connection is idle, but there are unacknowledged messages, " +
-                                "will wait: " + nodeId);
+                        if (recovery != null &&
+                            recovery.nodeAlive(getSpiContext().node(nodeId)) &&
+                            !recovery.messagesFutures().isEmpty()) {
+                            if (log.isDebugEnabled())
+                                log.debug("Node connection is idle, but there are unacknowledged messages, " +
+                                    "will wait: " + nodeId);
 
-                        continue;
-                    }
+                            continue;
+                        }
 
-                    if (log.isDebugEnabled())
-                        log.debug("Closing idle node connection: " + nodeId);
+                        if (log.isDebugEnabled())
+                            log.debug("Closing idle node connection: " + nodeId);
 
-                    if (client.close() || client.closed())
-                        clients.remove(nodeId, client);
+                        if (client.close() || client.closed())
+                            removeNodeClient(nodeId, client);
+                    }
                 }
             }
 
             for (GridNioSession ses : nioSrvr.sessions()) {
                 GridNioRecoveryDescriptor recovery = ses.inRecoveryDescriptor();
 
-                if (recovery != null && useTwoConnections(recovery.node())) {
+                if (recovery != null && useMultipleConnections(recovery.node())) {
                     assert ses.accepted() : ses;
 
                     sendAckOnTimeout(recovery, ses);
@@ -3354,18 +3579,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         }
 
         /**
-         *
+         * @param recoveryDescs Recovery descriptors to cleanup.
          */
-        private void cleanupRecovery(ConcurrentMap<ClientKey, GridNioRecoveryDescriptor> recoveryDescs) {
-            Set<ClientKey> left = null;
+        private void cleanupRecovery(ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> recoveryDescs) {
+            Set<ConnectionKey> left = null;
 
-            for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> e : recoveryDescs.entrySet()) {
+            for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> e : recoveryDescs.entrySet()) {
                 if (left != null && left.contains(e.getKey()))
                     continue;
 
-                GridNioRecoveryDescriptor recoverySnd = e.getValue();
+                GridNioRecoveryDescriptor recoveryDesc = e.getValue();
 
-                if (!recoverySnd.nodeAlive(getSpiContext().node(recoverySnd.node().id()))) {
+                if (!recoveryDesc.nodeAlive(getSpiContext().node(e.getKey().nodeId()))) {
                     if (left == null)
                         left = new HashSet<>();
 
@@ -3376,11 +3601,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             if (left != null) {
                 assert !left.isEmpty();
 
-                for (ClientKey id : left) {
-                    GridNioRecoveryDescriptor recoverySnd = recoveryDescs.get(id);
+                for (ConnectionKey id : left) {
+                    GridNioRecoveryDescriptor recoveryDesc = recoveryDescs.get(id);
 
-                    if (recoverySnd != null && recoverySnd.onNodeLeft())
-                        recoveryDescs.remove(id);
+                    if (recoveryDesc != null && recoveryDesc.onNodeLeft())
+                        recoveryDescs.remove(id, recoveryDesc);
                 }
             }
         }
@@ -3389,45 +3614,43 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
          * @param sesInfo Disconnected session information.
          */
         private void processDisconnect(DisconnectedSessionInfo sesInfo) {
-            if (sesInfo.reconnect) {
-                GridNioRecoveryDescriptor recoveryDesc = sesInfo.recoveryDesc;
+            GridNioRecoveryDescriptor recoveryDesc = sesInfo.recoveryDesc;
 
-                ClusterNode node = recoveryDesc.node();
+            ClusterNode node = recoveryDesc.node();
 
-                if (!recoveryDesc.nodeAlive(getSpiContext().node(node.id())))
-                    return;
+            if (!recoveryDesc.nodeAlive(getSpiContext().node(node.id())))
+                return;
 
-                try {
-                    if (log.isDebugEnabled())
-                        log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']');
+            try {
+                if (log.isDebugEnabled())
+                    log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']');
 
-                    GridCommunicationClient client = reserveClient(node);
+                GridCommunicationClient client = reserveClient(node, sesInfo.connIdx);
 
-                    client.release();
-                }
-                catch (IgniteCheckedException | IgniteException e) {
-                    try {
-                        if (recoveryDesc.nodeAlive(getSpiContext().node(node.id())) && getSpiContext().pingNode(node.id())) {
-                            if (log.isDebugEnabled())
-                                log.debug("Recovery reconnect failed, will retry " +
-                                    "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
-
-                            addProcessDisconnectRequest(sesInfo);
-                        }
-                        else {
-                            if (log.isDebugEnabled())
-                                log.debug("Recovery reconnect failed, " +
-                                    "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
+                client.release();
+            }
+            catch (IgniteCheckedException | IgniteException e) {
+                try {
+                    if (recoveryDesc.nodeAlive(getSpiContext().node(node.id())) && getSpiContext().pingNode(node.id())) {
+                        if (log.isDebugEnabled())
+                            log.debug("Recovery reconnect failed, will retry " +
+                                "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
 
-                            onException("Recovery reconnect failed, node left [rmtNode=" + recoveryDesc.node().id() + "]",
-                                e);
-                        }
+                        addProcessDisconnectRequest(sesInfo);
                     }
-                    catch (IgniteClientDisconnectedException e0) {
+                    else {
                         if (log.isDebugEnabled())
-                            log.debug("Failed to ping node, client disconnected.");
+                            log.debug("Recovery reconnect failed, " +
+                                "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
+
+                        onException("Recovery reconnect failed, node left [rmtNode=" + recoveryDesc.node().id() + "]",
+                            e);
                     }
                 }
+                catch (IgniteClientDisconnectedException e0) {
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to ping node, client disconnected.");
+                }
             }
         }
 
@@ -3631,6 +3854,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         }
 
         /**
+         * @return Connection index.
+         */
+        public int connectionIndex() {
+            return 0;
+        }
+
+        /**
          * @return Connect count.
          */
         public long connectCount() {
@@ -3661,7 +3891,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             if (buf.remaining() < 33)
                 return false;
 
-            buf.put(HANDSHAKE_MSG_TYPE);
+            buf.put(directType());
 
             byte[] bytes = U.uuidToBytes(nodeId);
 
@@ -3711,6 +3941,78 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     }
 
     /**
+     *
+     */
+    @SuppressWarnings("PublicInnerClass")
+    public static class HandshakeMessage2 extends HandshakeMessage {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private int connIdx;
+
+        /**
+         *
+         */
+        public HandshakeMessage2() {
+            // No-op.
+        }
+
+        /**
+         * @param nodeId Node ID.
+         * @param connectCnt Connect count.
+         * @param rcvCnt Number of received messages.
+         * @param connIdx Connection index.
+         */
+        HandshakeMessage2(UUID nodeId, long connectCnt, long rcvCnt, int connIdx) {
+            super(nodeId, connectCnt, rcvCnt);
+
+            this.connIdx = connIdx;
+        }
+
+        /** {@inheritDoc} */
+        @Override public byte directType() {
+            return 125;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int connectionIndex() {
+            return connIdx;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+            if (!super.writeTo(buf, writer))
+                return false;
+
+            if (buf.remaining() < 4)
+                return false;
+
+            buf.putInt(connIdx);
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+            if (!super.readFrom(buf, reader))
+                return false;
+
+            if (buf.remaining() < 4)
+                return false;
+
+            connIdx = buf.getInt();
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(HandshakeMessage2.class, this);
+        }
+    }
+
+    /**
      * Recovery acknowledgment message.
      */
     @SuppressWarnings("PublicInnerClass")
@@ -3955,16 +4257,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         private final GridNioRecoveryDescriptor recoveryDesc;
 
         /** */
-        private final boolean reconnect;
+        private int connIdx;
 
         /**
          * @param recoveryDesc Recovery descriptor.
-         * @param reconnect Reconnect flag.
+         * @param connIdx Connection index.
          */
-        DisconnectedSessionInfo(@Nullable GridNioRecoveryDescriptor recoveryDesc,
-            boolean reconnect) {
+        DisconnectedSessionInfo(@Nullable GridNioRecoveryDescriptor recoveryDesc, int connIdx) {
             this.recoveryDesc = recoveryDesc;
-            this.reconnect = reconnect;
+            this.connIdx = connIdx;
         }
 
         /** {@inheritDoc} */
@@ -3972,4 +4273,85 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             return S.toString(DisconnectedSessionInfo.class, this);
         }
     }
+
+    /**
+     *
+     */
+    private static class ConnectionKey {
+        /** */
+        private final UUID nodeId;
+
+        /** */
+        private final int idx;
+
+        /** */
+        private final long connCnt;
+
+        /**
+         * @param nodeId Node ID.
+         * @param idx Connection index.
+         * @param connCnt Connection counter (set only for incoming connections).
+         */
+        ConnectionKey(UUID nodeId, int idx, long connCnt) {
+            this.nodeId = nodeId;
+            this.idx = idx;
+            this.connCnt = connCnt;
+        }
+
+        /**
+         * @return Connection counter.
+         */
+        long connectCount() {
+            return connCnt;
+        }
+
+        /**
+         * @return Node ID.
+         */
+        UUID nodeId() {
+            return nodeId;
+        }
+
+        /**
+         * @return Connection index.
+         */
+        int connectionIndex() {
+            return idx;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            ConnectionKey key = (ConnectionKey) o;
+
+            return idx == key.idx && nodeId.equals(key.nodeId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = nodeId.hashCode();
+            res = 31 * res + idx;
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(ConnectionKey.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    interface ConnectionPolicy {
+        /**
+         * @return Thread connection index.
+         */
+        int connectionIndex();
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6023539d/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
index 21204c7..509a25e 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
@@ -182,6 +182,7 @@ public class SocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
         try {
             srv = new GridNioServer.Builder<byte[]>()
                 .address(addr == null ? InetAddress.getLocalHost() : addr)
+                .serverName("sock-streamer")
                 .port(port)
                 .listener(lsnr)
                 .logger(log)

http://git-wip-us.apache.org/repos/asf/ignite/blob/6023539d/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
index 289278f..e9d6c7a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
@@ -76,6 +76,7 @@ public class IgniteSlowClientDetectionSelfTest extends GridCommonAbstractTest {
         commSpi.setSlowClientQueueLimit(50);
         commSpi.setSharedMemoryPort(-1);
         commSpi.setIdleConnectionTimeout(300_000);
+        commSpi.setConnectionsPerNode(1);
 
         cfg.setCommunicationSpi(commSpi);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/6023539d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
index 0460a8f..1bfd727 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
@@ -58,6 +58,7 @@ public abstract class IgniteCacheMessageRecoveryAbstractTest extends GridCommonA
 
         commSpi.setSocketWriteTimeout(1000);
         commSpi.setSharedMemoryPort(-1);
+        commSpi.setConnectionsPerNode(connectionsPerNode());
 
         cfg.setCommunicationSpi(commSpi);
 
@@ -76,6 +77,13 @@ public abstract class IgniteCacheMessageRecoveryAbstractTest extends GridCommonA
     }
 
     /**
+     * @return Value for {@link TcpCommunicationSpi#setConnectionsPerNode(int)}.
+     */
+    protected int connectionsPerNode() {
+        return TcpCommunicationSpi.DFLT_CONN_PER_NODE;
+    }
+
+    /**
      * @return Cache atomicity mode.
      */
     protected abstract CacheAtomicityMode atomicityMode();
@@ -174,18 +182,22 @@ public abstract class IgniteCacheMessageRecoveryAbstractTest extends GridCommonA
     static boolean closeSessions(Ignite ignite) throws Exception {
         TcpCommunicationSpi commSpi = (TcpCommunicationSpi)ignite.configuration().getCommunicationSpi();
 
-        Map<UUID, GridCommunicationClient> clients = U.field(commSpi, "clients");
+        Map<UUID, GridCommunicationClient[]> clients = U.field(commSpi, "clients");
 
         boolean closed = false;
 
-        for (GridCommunicationClient client : clients.values()) {
-            GridTcpNioCommunicationClient client0 = (GridTcpNioCommunicationClient)client;
+        for (GridCommunicationClient[] clients0 : clients.values()) {
+            for (GridCommunicationClient client : clients0) {
+                if (client != null) {
+                    GridTcpNioCommunicationClient client0 = (GridTcpNioCommunicationClient)client;
 
-            GridNioSession ses = client0.session();
+                    GridNioSession ses = client0.session();
 
-            ses.close();
+                    ses.close();
 
-            closed = true;
+                    closed = true;
+                }
+            }
         }
 
         return closed;

http://git-wip-us.apache.org/repos/asf/ignite/blob/6023539d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
index 6256225..0dd4079 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
@@ -50,8 +50,8 @@ public class IgniteCacheMessageWriteTimeoutTest extends GridCommonAbstractTest {
         // Try provoke connection close on socket writeTimeout.
         commSpi.setSharedMemoryPort(-1);
         commSpi.setMessageQueueLimit(10);
-        commSpi.setSocketReceiveBuffer(32);
-        commSpi.setSocketSendBuffer(32);
+        commSpi.setSocketReceiveBuffer(40);
+        commSpi.setSocketSendBuffer(40);
         commSpi.setSocketWriteTimeout(100);
         commSpi.setUnacknowledgedMessagesBufferSize(1000);
         commSpi.setConnectTimeout(10_000);
@@ -66,15 +66,20 @@ public class IgniteCacheMessageWriteTimeoutTest extends GridCommonAbstractTest {
         super.afterTest();
     }
 
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 10 * 60_000;
+    }
+
     /**
      * @throws Exception If failed.
      */
     public void testMessageQueueLimit() throws Exception {
-        startGridsMultiThreaded(3);
-
-        for (int i = 0; i < 15; i++) {
+        for (int i = 0; i < 3; i++) {
             log.info("Iteration: " + i);
 
+            startGridsMultiThreaded(3);
+
             IgniteInternalFuture<?> fut1 = startJobThreads(50);
 
             U.sleep(100);
@@ -83,6 +88,8 @@ public class IgniteCacheMessageWriteTimeoutTest extends GridCommonAbstractTest {
 
             fut1.get();
             fut2.get();
+
+            stopAllGrids();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/6023539d/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
index 652e47f..5ca8f26 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
@@ -69,7 +69,7 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest {
     private static final int commExtPort2 = 20100;
 
     /** */
-    private AddressResolver resolver;
+    private AddressResolver rslvr;
 
     /** */
     private boolean ipFinderUseLocPorts;
@@ -111,14 +111,15 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest {
         cfg.setConnectorConfiguration(null);
 
         TcpCommunicationSpi commSpi = new TcpCommunicationSpi() {
-            @Override protected GridCommunicationClient createTcpClient(ClusterNode node) throws IgniteCheckedException {
+            @Override protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx)
+                throws IgniteCheckedException {
                 Map<String, Object> attrs = new HashMap<>(node.attributes());
 
                 attrs.remove(createSpiAttributeName(ATTR_PORT));
 
                 ((TcpDiscoveryNode)node).setAttributes(attrs);
 
-                return super.createTcpClient(node);
+                return super.createTcpClient(node, connIdx);
             }
         };
 
@@ -126,12 +127,13 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest {
         commSpi.setLocalPort(commLocPort);
         commSpi.setLocalPortRange(1);
         commSpi.setSharedMemoryPort(-1);
+        commSpi.setConnectionsPerNode(1);
 
         cfg.setCommunicationSpi(commSpi);
 
-        assert resolver != null;
+        assert rslvr != null;
 
-        cfg.setAddressResolver(resolver);
+        cfg.setAddressResolver(rslvr);
 
         return cfg;
     }
@@ -147,7 +149,7 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest {
         map.put(new InetSocketAddress("127.0.0.1", locPort2), F.asList(new InetSocketAddress("127.0.0.1", extPort2)));
         map.put(new InetSocketAddress("127.0.0.1", commLocPort2), F.asList(new InetSocketAddress("127.0.0.1", commExtPort2)));
 
-        resolver = new AddressResolver() {
+        rslvr = new AddressResolver() {
             @Override public Collection<InetSocketAddress> getExternalAddresses(InetSocketAddress addr) {
                 return map.get(addr);
             }
@@ -167,7 +169,7 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest {
         map.put("127.0.0.1:" + locPort2, "127.0.0.1:" + extPort2);
         map.put("127.0.0.1:" + commLocPort2, "127.0.0.1:" + commExtPort2);
 
-        resolver = new BasicAddressResolver(map);
+        rslvr = new BasicAddressResolver(map);
 
         doTestForward();
     }
@@ -180,7 +182,7 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest {
 
         map.put("127.0.0.1", "127.0.0.1");
 
-        resolver = new BasicAddressResolver(map);
+        rslvr = new BasicAddressResolver(map);
 
         ipFinderUseLocPorts = true;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/6023539d/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
index 076724d..3c4fea0 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
@@ -90,16 +90,36 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
         super.afterTest();
 
         for (CommunicationSpi spi : spis.values()) {
-            ConcurrentMap<UUID, GridCommunicationClient> clients = U.field(spi, "clients");
+            ConcurrentMap<UUID, GridCommunicationClient[]> clients = U.field(spi, "clients");
+
+            for (int i = 0; i < 20; i++) {
+                GridCommunicationClient client0 = null;
+
+                for (GridCommunicationClient[] clients0 : clients.values()) {
+                    for (GridCommunicationClient client : clients0) {
+                        if (client != null) {
+                            client0 = client;
+
+                            break;
+                        }
+                    }
+
+                    if (client0 != null)
+                        break;
+                }
+
+                if (client0 == null)
+                    return;
 
-            for (int i = 0; i < 20 && !clients.isEmpty(); i++) {
                 info("Check failed for SPI [grid=" +
-                    GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "gridName") + ", spi=" + spi + ']');
+                    GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "gridName") +
+                    ", client=" + client0 +
+                    ", spi=" + spi + ']');
 
                 U.sleep(1000);
             }
 
-            assert clients.isEmpty() : "Clients: " + clients;
+            fail("Failed to wait when clients are closed.");
         }
     }
 }
\ No newline at end of file


Mime
View raw message