ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [ignite] branch master updated: IGNITE-12438 Extend communication protocol to support one-way client-server connections - Fixes #7639.
Date Thu, 09 Jul 2020 14:12:43 GMT
This is an automated email from the ASF dual-hosted git repository.

agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new ed0767d  IGNITE-12438 Extend communication protocol to support one-way client-server connections - Fixes #7639.
ed0767d is described below

commit ed0767d275ef6968ae083b9e60e5e4577112a6b5
Author: Ivan Bessonov <bessonov.ip@gmail.com>
AuthorDate: Thu Jul 9 16:55:22 2020 +0300

    IGNITE-12438 Extend communication protocol to support one-way client-server connections - Fixes #7639.
---
 .../ignite/configuration/IgniteConfiguration.java  |   2 +-
 .../java/org/apache/ignite/internal/GridTopic.java |   5 +-
 .../managers/communication/GridIoManager.java      | 241 ++++++++++--
 .../managers/communication/GridIoMessage.java      |  15 +-
 .../communication/GridIoMessageFactory.java        |   2 +
 .../communication/GridIoSecurityAwareMessage.java  |   8 +-
 .../processors/cache/mvcc/MvccProcessorImpl.java   |  14 +-
 .../spi/communication/tcp/TcpCommunicationSpi.java | 271 ++++++++++---
 .../tcp/internal/ConnectionRequestFuture.java      |  28 ++
 .../tcp/internal/ConnectionRequestor.java}         |  30 +-
 .../tcp/internal/NodeUnreachableException.java     |  49 +++
 .../internal/TcpConnectionIndexAwareMessage.java}  |  25 +-
 .../TcpConnectionRequestDiscoveryMessage.java      |  97 +++++
 .../TcpInverseConnectionResponseMessage.java}      | 103 ++---
 .../TcpDiscoveryServerOnlyCustomEventMessage.java  |   3 +-
 .../main/resources/META-INF/classnames.properties  |   2 +-
 .../IgniteCacheClientReconnectTest.java            |  63 ++++
 .../datastreamer/DataStreamerImplSelfTest.java     |   5 +-
 ...unicationInverseConnectionEstablishingTest.java | 417 +++++++++++++++++++++
 .../testframework/junits/GridAbstractTest.java     |  46 +++
 .../IgniteSpiCommunicationSelfTestSuite.java       |   5 +-
 21 files changed, 1243 insertions(+), 188 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 38f33c0..1dd2466 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -2833,7 +2833,7 @@ public class IgniteConfiguration {
      *
      * @param state New cluster state on start.
      * @return {@code this} for chaining.
-     * @see #getClusterStateOnStart() 
+     * @see #getClusterStateOnStart()
      */
     public IgniteConfiguration setClusterStateOnStart(ClusterState state) {
         this.clusterStateOnStart = state;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
index 1110091..0cf845c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
@@ -146,7 +146,10 @@ public enum GridTopic {
     TOPIC_DEADLOCK_DETECTION,
 
     /** Message topic for the distributed process. See {@link DistributedProcess}. */
-    TOPIC_DISTRIBUTED_PROCESS;
+    TOPIC_DISTRIBUTED_PROCESS,
+
+    /** */
+    TOPIC_COMM_SYSTEM;
 
     /** Enum values. */
     private static final GridTopic[] VALS = values();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 3ece5c9..452a5c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -92,6 +92,7 @@ import org.apache.ignite.internal.direct.DirectMessageReader;
 import org.apache.ignite.internal.direct.DirectMessageWriter;
 import org.apache.ignite.internal.managers.GridManagerAdapter;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
+import org.apache.ignite.internal.managers.discovery.CustomEventListener;
 import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccMessage;
@@ -134,6 +135,9 @@ import org.apache.ignite.spi.communication.CommunicationListener;
 import org.apache.ignite.spi.communication.CommunicationSpi;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.communication.tcp.internal.CommunicationListenerEx;
+import org.apache.ignite.spi.communication.tcp.internal.ConnectionRequestor;
+import org.apache.ignite.spi.communication.tcp.internal.TcpConnectionRequestDiscoveryMessage;
+import org.apache.ignite.spi.communication.tcp.internal.TcpInverseConnectionResponseMessage;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -141,6 +145,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE_COORDINATOR;
+import static org.apache.ignite.internal.GridTopic.TOPIC_COMM_SYSTEM;
 import static org.apache.ignite.internal.GridTopic.TOPIC_COMM_USER;
 import static org.apache.ignite.internal.GridTopic.TOPIC_IO_TEST;
 import static org.apache.ignite.internal.IgniteFeatures.CHANNEL_COMMUNICATION;
@@ -160,6 +165,8 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTI
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.isReservedGridIoPolicy;
 import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
 import static org.apache.ignite.internal.util.nio.GridNioBackPressureControl.threadProcessingMessage;
+import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.ATTR_PAIRED_CONN;
+import static org.apache.ignite.spi.communication.tcp.internal.TcpConnectionIndexAwareMessage.UNDEFINED_CONNECTION_INDEX;
 import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q_OPTIMIZED_RMV;
 
 /**
@@ -322,7 +329,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         new ConcurrentHashMap<>();
 
     /** Local node ID. */
-    private final UUID locNodeId;
+    private volatile UUID locNodeId;
 
     /** Cache for messages that were received prior to discovery. */
     private final ConcurrentMap<UUID, Deque<DelayedMessage>> waitMap = new ConcurrentHashMap<>();
@@ -363,6 +370,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     /** */
     private final AtomicLong ioTestId = new AtomicLong();
 
+    /** */
+    private final TcpCommunicationInverseConnectionHandler invConnHandler = new TcpCommunicationInverseConnectionHandler();
+
     /** No-op runnable. */
     private static final IgniteRunnable NOOP = () -> {};
 
@@ -465,12 +475,15 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
         msgFactory = new IgniteMessageFactoryImpl(msgs);
 
+        CommunicationSpi<Serializable> spi = getSpi();
+
+        if ((CommunicationSpi<?>)spi instanceof TcpCommunicationSpi)
+            getTcpCommunicationSpi().setConnectionRequestor(invConnHandler);
+
         startSpi();
 
         MetricRegistry ioMetric = ctx.metric().registry(COMM_METRICS);
 
-        CommunicationSpi spi = ctx.config().getCommunicationSpi();
-
         ioMetric.register(OUTBOUND_MSG_QUEUE_CNT, spi::getOutboundMessagesQueueSize,
                 "Outbound messages queue size.");
 
@@ -555,6 +568,13 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         });
     }
 
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?> onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
+        locNodeId = ctx.localNodeId();
+
+        return super.onReconnected(clusterRestarted);
+    }
+
     /**
      * @param nodes Nodes.
      * @param payload Payload.
@@ -1010,6 +1030,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
         ctx.event().addLocalEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
 
+        invConnHandler.onStart();
+
         // Make sure that there are no stale messages due to window between communication
         // manager start and kernal start.
         // 1. Process wait list.
@@ -1083,6 +1105,30 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         }
     }
 
+    /**
+     * Checks that both local and remote nodes are configured to use paired connections.
+     *
+     * @param node Remote node.
+     * @param tcpCommSpi TcpCommunicationSpi.
+     * @return {@code True} if both local and remote nodes are configured to use paired connections.
+     */
+    private boolean isPairedConnection(ClusterNode node, TcpCommunicationSpi tcpCommSpi) {
+        return tcpCommSpi.isUsePairedConnections() &&
+            Boolean.TRUE.equals(node.attribute(U.spiAttribute(tcpCommSpi, ATTR_PAIRED_CONN)));
+    }
+
+    /**
+     * @return Instance of {@link TcpCommunicationSpi}. Will throw {@link AssertionError} or {@link ClassCastException}
+     *      if another SPI type is configured. Must be called only if type of SPI has been explicilty asserted earlier.
+     */
+    private TcpCommunicationSpi getTcpCommunicationSpi() {
+        CommunicationSpi<?> spi = getSpi();
+
+        assert spi instanceof TcpCommunicationSpi;
+
+        return (TcpCommunicationSpi)spi;
+    }
+
     /** {@inheritDoc} */
     @SuppressWarnings("BusyWait")
     @Override public void onKernalStop0(boolean cancel) {
@@ -1141,6 +1187,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     @Override public void stop(boolean cancel) throws IgniteCheckedException {
         stopSpi();
 
+        invConnHandler.onStop();
+
         if (log.isDebugEnabled())
             log.debug(stopInfo());
     }
@@ -1954,7 +2002,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             PUBLIC_POOL,
             false,
             0,
-            false);
+            false,
+            UNDEFINED_CONNECTION_INDEX
+        );
 
         try {
             if (topicOrd < 0)
@@ -1998,7 +2048,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         long timeout,
         boolean skipOnTimeout,
         IgniteInClosure<IgniteException> ackC,
-        boolean async
+        boolean async,
+        int connIdx
     ) throws IgniteCheckedException {
         assert node != null;
         assert topic != null;
@@ -2006,7 +2057,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         assert !async || msg instanceof GridIoUserMessage : msg; // Async execution was added only for IgniteMessaging.
         assert topicOrd >= 0 || !(topic instanceof GridTopic) : msg;
 
-        GridIoMessage ioMsg = createGridIoMessage(topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout);
+        GridIoMessage ioMsg = createGridIoMessage(topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout, connIdx);
 
         if (locNodeId.equals(node.id())) {
             assert plc != P2P_POOL;
@@ -2030,15 +2081,15 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             if (topicOrd < 0)
                 ioMsg.topicBytes(U.marshal(marsh, topic));
 
-            try {
-                if ((CommunicationSpi)getSpi() instanceof TcpCommunicationSpi)
-                    ((TcpCommunicationSpi)(CommunicationSpi)getSpi()).sendMessage(node, ioMsg, ackC);
-                else
-                    getSpi().sendMessage(node, ioMsg);
-            }
-            catch (IgniteSpiException e) {
-                if (e.getCause() instanceof ClusterTopologyCheckedException)
-                    throw (ClusterTopologyCheckedException)e.getCause();
+                try {
+                    if ((CommunicationSpi<?>)getSpi() instanceof TcpCommunicationSpi)
+                        getTcpCommunicationSpi().sendMessage(node, ioMsg, ackC);
+                    else
+                        getSpi().sendMessage(node, ioMsg);
+                }
+                catch (IgniteSpiException e) {
+                    if (e.getCause() instanceof ClusterTopologyCheckedException)
+                        throw (ClusterTopologyCheckedException)e.getCause();
 
                 if (!ctx.discovery().alive(node))
                     throw new ClusterTopologyCheckedException("Failed to send message, node left: " + node.id(), e);
@@ -2052,6 +2103,14 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     }
 
     /** */
+    private long getInverseConnectionWaitTimeout() {
+        return ctx.config().getFailureDetectionTimeout();
+    }
+
+    /**
+     * @return One of two message wrappers. The first is {@link GridIoMessage}, the second is secured version {@link
+     * GridIoSecurityAwareMessage}.
+     */
     private @NotNull GridIoMessage createGridIoMessage(
         Object topic,
         int topicOrd,
@@ -2059,7 +2118,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         byte plc,
         boolean ordered,
         long timeout,
-        boolean skipOnTimeout) {
+        boolean skipOnTimeout,
+        int connIdx
+    ) {
         if (ctx.security().enabled()) {
             UUID secSubjId = null;
 
@@ -2068,10 +2129,10 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             if (!locNodeId.equals(curSecSubjId))
                 secSubjId = curSecSubjId;
 
-            return new GridIoSecurityAwareMessage(secSubjId, plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout);
+            return new GridIoSecurityAwareMessage(secSubjId, plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout, connIdx);
         }
 
-        return new GridIoMessage(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout);
+        return new GridIoMessage(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout, connIdx);
     }
 
     /**
@@ -2105,7 +2166,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         if (node == null)
             throw new ClusterTopologyCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
 
-        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false);
+        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false, UNDEFINED_CONNECTION_INDEX);
     }
 
     /**
@@ -2117,7 +2178,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      */
     public void sendToCustomTopic(ClusterNode node, Object topic, Message msg, byte plc)
         throws IgniteCheckedException {
-        send(node, topic, -1, msg, plc, false, 0, false, null, false);
+        send(node, topic, -1, msg, plc, false, 0, false, null, false, UNDEFINED_CONNECTION_INDEX);
     }
 
     /**
@@ -2129,7 +2190,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      */
     public void sendToGridTopic(ClusterNode node, GridTopic topic, Message msg, byte plc)
         throws IgniteCheckedException {
-        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false);
+        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false, UNDEFINED_CONNECTION_INDEX);
     }
 
     /**
@@ -2142,7 +2203,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      */
     public void sendGeneric(ClusterNode node, Object topic, int topicOrd, Message msg, byte plc)
         throws IgniteCheckedException {
-        send(node, topic, topicOrd, msg, plc, false, 0, false, null, false);
+        send(node, topic, topicOrd, msg, plc, false, 0, false, null, false, UNDEFINED_CONNECTION_INDEX);
     }
 
     /**
@@ -2164,7 +2225,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     ) throws IgniteCheckedException {
         assert timeout > 0 || skipOnTimeout;
 
-        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null, false);
+        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null, false, UNDEFINED_CONNECTION_INDEX);
     }
 
     /**
@@ -2181,7 +2242,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         byte plc,
         IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException
     {
-        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackC, false);
+        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackC, false, UNDEFINED_CONNECTION_INDEX);
     }
 
     /**
@@ -2208,7 +2269,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
         for (ClusterNode node : nodes) {
             try {
-                send(node, topic, topic.ordinal(), msg, plc, true, timeout, skipOnTimeout, null, false);
+                send(node, topic, topic.ordinal(), msg, plc, true, timeout, skipOnTimeout, null, false, UNDEFINED_CONNECTION_INDEX);
             }
             catch (IgniteCheckedException e) {
                 if (err == null)
@@ -2239,7 +2300,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
         for (ClusterNode node : nodes) {
             try {
-                send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false);
+                send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false, UNDEFINED_CONNECTION_INDEX);
             }
             catch (IgniteCheckedException e) {
                 if (err == null)
@@ -2274,7 +2335,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     ) throws IgniteCheckedException {
         assert timeout > 0 || skipOnTimeout;
 
-        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC, false);
+        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC, false, UNDEFINED_CONNECTION_INDEX);
     }
 
     /**
@@ -2349,7 +2410,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                 0,
                 false,
                 null,
-                async);
+                async,
+                UNDEFINED_CONNECTION_INDEX
+            );
         }
         else {
             ClusterNode locNode = F.find(nodes, null, F.localNode(locNodeId));
@@ -2372,7 +2435,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                     0,
                     false,
                     null,
-                    async);
+                    async,
+                    UNDEFINED_CONNECTION_INDEX
+                );
             }
         }
     }
@@ -4226,4 +4291,122 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
         return null;
     }
+
+    /**
+     * Responsible for handling network situation where server cannot open connection to client and
+     * has to ask client to establish a connection to specific server.
+     *
+     * This includes the following steps:
+     * <ol>
+     *     <li>
+     *         Server tries to send regular communication message to unreachable client,
+     *         detects that client is unreachagle and directs special discovery message to it.
+     *         After that it wait for client to reply.
+     *     </li>
+     *     <li>
+     *         Client receives discovery message and sends special communication message in response.
+     *         This action opens communication channel between client and server that can be used by both sides.
+     *     </li>
+     *     <li>
+     *         Server on receiving comm message sends original communication message to the client.
+     *     </li>
+     * </ol>
+     */
+    private final class TcpCommunicationInverseConnectionHandler implements ConnectionRequestor {
+        /**
+         * Executor service to send special communication message.
+         */
+        private ExecutorService responseSendService = Executors.newCachedThreadPool();
+
+        /**
+         * Discovery event listener (works only on client nodes for now) notified when
+         * inverse connection request arrives.
+         */
+        private CustomEventListener<TcpConnectionRequestDiscoveryMessage> discoConnReqLsnr = (topVer, snd, msg) -> {
+            if (!locNodeId.equals(msg.receiverNodeId()))
+                return;
+
+            if (log.isInfoEnabled())
+                log.info("Received inverse communication request from " + snd + " for connection index "
+                    + msg.connectionIndex());
+
+            TcpCommunicationSpi tcpCommSpi = getTcpCommunicationSpi();
+
+            assert !isPairedConnection(snd, tcpCommSpi);
+
+            int connIdx = msg.connectionIndex();
+
+            responseSendService.submit(() -> {
+                try {
+                    send(snd,
+                        TOPIC_COMM_SYSTEM,
+                        TOPIC_COMM_SYSTEM.ordinal(),
+                        new TcpInverseConnectionResponseMessage(connIdx),
+                        SYSTEM_POOL,
+                        false,
+                        0,
+                        false,
+                        null,
+                        false,
+                        connIdx
+                    );
+                }
+                catch (IgniteCheckedException e) {
+                    log.error("Failed to send response to inverse communication connection request from node: " + snd.id(), e);
+                }
+            });
+        };
+
+        /** */
+        public void onStart() {
+            if (ctx.clientNode())
+                ctx.discovery().setCustomEventListener(TcpConnectionRequestDiscoveryMessage.class, invConnHandler.discoConnReqLsnr);
+
+            addMessageListener(TOPIC_COMM_SYSTEM, (nodeId, msg, plc) -> {
+                if (msg instanceof TcpInverseConnectionResponseMessage) {
+                    if (log.isInfoEnabled())
+                        log.info("Response for inverse connection received from node " + nodeId +
+                            ", connection index is " + ((TcpInverseConnectionResponseMessage)msg).connectionIndex());
+                }
+            });
+        }
+
+        /**
+         * Executes inverse connection protocol by sending discovery request and then waiting on future
+         * completed when response arrives or timeout is reached.
+         *
+         * @param node Unreachable node.
+         * @param connIdx Connection index.
+         */
+        @Override public void request(ClusterNode node, int connIdx) {
+            TcpCommunicationSpi tcpCommSpi = getTcpCommunicationSpi();
+
+            if (isPairedConnection(node, tcpCommSpi))
+                throw new IgniteSpiException("Inverse connection protocol doesn't support paired connections");
+
+            try {
+                if (log.isInfoEnabled())
+                    log.info("TCP connection failed, node " + node.id() + " is unreachable," +
+                        " will attempt to request inverse connection via discovery SPI.");
+
+                TcpConnectionRequestDiscoveryMessage msg = new TcpConnectionRequestDiscoveryMessage(
+                    node.id(), connIdx
+                );
+
+                ctx.discovery().sendCustomEvent(msg);
+            }
+            catch (IgniteCheckedException ex) {
+                throw new IgniteSpiException(ex);
+            }
+        }
+
+        /** */
+        public void onStop() {
+            U.shutdownNow(
+                TcpCommunicationInverseConnectionHandler.class,
+                responseSendService,
+                log
+            );
+        }
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
index fe61aec..e01c0a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
@@ -29,12 +29,13 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.apache.ignite.spi.communication.tcp.internal.TcpConnectionIndexAwareMessage;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Wrapper for all grid messages.
  */
-public class GridIoMessage implements Message {
+public class GridIoMessage implements TcpConnectionIndexAwareMessage {
     /** */
     public static final Integer STRIPE_DISABLED_PART = Integer.MIN_VALUE;
 
@@ -67,6 +68,9 @@ public class GridIoMessage implements Message {
     /** Message. */
     private Message msg;
 
+    /** */
+    private transient int connIdx = UNDEFINED_CONNECTION_INDEX;
+
     /**
      * No-op constructor to support {@link Externalizable} interface.
      * This constructor is not meant to be used for other purposes.
@@ -91,7 +95,8 @@ public class GridIoMessage implements Message {
         Message msg,
         boolean ordered,
         long timeout,
-        boolean skipOnTimeout
+        boolean skipOnTimeout,
+        int connIdx
     ) {
         assert topic != null;
         assert topicOrd <= Byte.MAX_VALUE;
@@ -104,6 +109,7 @@ public class GridIoMessage implements Message {
         this.ordered = ordered;
         this.timeout = timeout;
         this.skipOnTimeout = skipOnTimeout;
+        this.connIdx = connIdx;
     }
 
     /**
@@ -169,6 +175,11 @@ public class GridIoMessage implements Message {
         return skipOnTimeout;
     }
 
+    /** {@inheritDoc} */
+    @Override public int connectionIndex() {
+        return connIdx;
+    }
+
     /**
      * @return {@code True} if message is ordered, {@code false} otherwise.
      */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 0d7976e..7a0ab93 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -198,6 +198,7 @@ import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
 import org.apache.ignite.spi.collision.jobstealing.JobStealingRequest;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.internal.TcpInverseConnectionResponseMessage;
 import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage;
 import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage2;
 import org.apache.ignite.spi.communication.tcp.messages.HandshakeWaitMessage;
@@ -401,6 +402,7 @@ public class GridIoMessageFactory implements MessageFactoryProvider {
         factory.register(GridIoSecurityAwareMessage.TYPE_CODE, GridIoSecurityAwareMessage::new);
         factory.register(SessionChannelMessage.TYPE_CODE, SessionChannelMessage::new);
         factory.register(SingleNodeMessage.TYPE_CODE, SingleNodeMessage::new);
+        factory.register((short)177, TcpInverseConnectionResponseMessage::new);
 
         // [-3..119] [124..129] [-23..-28] [-36..-55] - this
         // [120..123] - DR
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoSecurityAwareMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoSecurityAwareMessage.java
index c1c7d97..a9b62af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoSecurityAwareMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoSecurityAwareMessage.java
@@ -23,6 +23,7 @@ import java.util.UUID;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 
 /**
  *
@@ -54,6 +55,7 @@ public class GridIoSecurityAwareMessage extends GridIoMessage {
      * @param ordered Message ordered flag.
      * @param timeout Timeout.
      * @param skipOnTimeout Whether message can be skipped on timeout.
+     * @param connIdx Desired {@link TcpCommunicationSpi} connection index if applicable.
      */
     public GridIoSecurityAwareMessage(
         UUID secSubjId,
@@ -63,8 +65,10 @@ public class GridIoSecurityAwareMessage extends GridIoMessage {
         Message msg,
         boolean ordered,
         long timeout,
-        boolean skipOnTimeout) {
-        super(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout);
+        boolean skipOnTimeout,
+        int connIdx
+    ) {
+        super(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout, connIdx);
 
         this.secSubjId = secSubjId;
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
index 83deee5..7fcc291 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
@@ -591,12 +591,14 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
             prevQueries.init(nodes, ctx.discovery()::alive);
         }
         else if (sndQrys) {
-            try {
-                sendMessage(newCrd.nodeId(), new MvccActiveQueriesMessage(qryIds));
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to send active queries to mvcc coordinator: " + e);
-            }
+            ctx.getSystemExecutorService().submit(() -> {
+                try {
+                    sendMessage(newCrd.nodeId(), new MvccActiveQueriesMessage(qryIds));
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to send active queries to mvcc coordinator: " + e);
+                }
+            });
         }
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 18139fe..c862956 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -109,7 +109,6 @@ import org.apache.ignite.internal.util.nio.ssl.BlockingSslHandler;
 import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
 import org.apache.ignite.internal.util.nio.ssl.GridSslMeta;
 import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.LT;
@@ -119,6 +118,7 @@ import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.internal.worker.WorkersRegistry;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteExperimental;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
@@ -150,18 +150,24 @@ import org.apache.ignite.spi.communication.CommunicationListener;
 import org.apache.ignite.spi.communication.CommunicationSpi;
 import org.apache.ignite.spi.communication.tcp.internal.CommunicationListenerEx;
 import org.apache.ignite.spi.communication.tcp.internal.ConnectionKey;
+import org.apache.ignite.spi.communication.tcp.internal.ConnectionRequestFuture;
+import org.apache.ignite.spi.communication.tcp.internal.ConnectionRequestor;
 import org.apache.ignite.spi.communication.tcp.internal.HandshakeException;
+import org.apache.ignite.spi.communication.tcp.internal.NodeUnreachableException;
 import org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConnectionCheckFuture;
 import org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationNodeConnectionCheckFuture;
+import org.apache.ignite.spi.communication.tcp.internal.TcpConnectionIndexAwareMessage;
 import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage;
 import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage2;
 import org.apache.ignite.spi.communication.tcp.messages.HandshakeWaitMessage;
 import org.apache.ignite.spi.communication.tcp.messages.NodeIdMessage;
 import org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage;
 import org.apache.ignite.spi.discovery.DiscoverySpi;
+import org.apache.ignite.spi.discovery.IgniteDiscoveryThread;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
 
 import static java.util.Collections.emptyList;
 import static java.util.Objects.nonNull;
@@ -176,6 +182,7 @@ import static org.apache.ignite.internal.IgniteFeatures.nodeSupports;
 import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META;
 import static org.apache.ignite.plugin.extensions.communication.Message.DIRECT_TYPE_SIZE;
 import static org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConnectionCheckFuture.SES_FUT_META;
+import static org.apache.ignite.spi.communication.tcp.internal.TcpConnectionIndexAwareMessage.UNDEFINED_CONNECTION_INDEX;
 import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.ALREADY_CONNECTED;
 import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.NEED_WAIT;
 import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.NODE_STOPPING;
@@ -313,6 +320,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
     /** */
     public static final String ATTR_PAIRED_CONN = "comm.tcp.pairedConnection";
 
+    /** */
+    public static final String ATTR_FORCE_CLIENT_SERVER_CONNECTIONS = "comm.force.client.srv.connections";
+
     /** Default port which node sets listener to (value is <tt>47100</tt>). */
     public static final int DFLT_PORT = 47100;
 
@@ -583,7 +593,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
 
                     HandshakeMessage msg0 = (HandshakeMessage)msg;
 
-                    sndId = ((HandshakeMessage)msg).nodeId();
+                    sndId = msg0.nodeId();
                     connKey = new ConnectionKey(sndId, msg0.connectionIndex(), msg0.connectCount());
                 }
 
@@ -615,19 +625,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                     if (unknownNode) {
                         U.warn(log, "Close incoming connection, unknown node [nodeId=" + sndId + ", ses=" + ses + ']');
 
-                        ses.send(new RecoveryLastReceivedMessage(UNKNOWN_NODE)).listen(new CI1<IgniteInternalFuture<?>>() {
-                            @Override public void apply(IgniteInternalFuture<?> fut) {
-                                ses.close();
-                            }
-                        });
-                    }
-                    else {
-                        ses.send(new RecoveryLastReceivedMessage(NEED_WAIT)).listen(new CI1<IgniteInternalFuture<?>>() {
-                            @Override public void apply(IgniteInternalFuture<?> fut) {
-                                ses.close();
-                            }
-                        });
+                        ses.send(new RecoveryLastReceivedMessage(UNKNOWN_NODE)).listen(fut -> ses.close());
                     }
+                    else
+                        ses.send(new RecoveryLastReceivedMessage(NEED_WAIT)).listen(fut -> ses.close());
 
                     return;
                 }
@@ -774,8 +775,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                             boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
                                 new ConnectClosure(ses, recoveryDesc, rmtNode, connKey, msg0, !hasShmemClient, fut));
 
+                            GridTcpNioCommunicationClient client = null;
+
                             if (reserved)
-                                connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
+                                client = connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
+
+                            if (oldFut instanceof ConnectionRequestFuture && !oldFut.isDone())
+                                oldFut.onDone(client);
                         }
                     }
                 }
@@ -851,11 +857,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                         if (log.isDebugEnabled())
                             log.debug("Close incoming connection, failed to enter gateway.");
 
-                        ses.send(new RecoveryLastReceivedMessage(NODE_STOPPING)).listen(new CI1<IgniteInternalFuture<?>>() {
-                            @Override public void apply(IgniteInternalFuture<?> fut) {
-                                ses.close();
-                            }
-                        });
+                        ses.send(new RecoveryLastReceivedMessage(NODE_STOPPING)).listen(fut -> ses.close());
 
                         return;
                     }
@@ -1327,6 +1329,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
      */
     private long selectorSpins = IgniteSystemProperties.getLong("IGNITE_SELECTOR_SPINS", 0L);
 
+    /** */
+    private boolean forceClientToSrvConnections;
+
     /** Address resolver. */
     private AddressResolver addrRslvr;
 
@@ -1355,6 +1360,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
     /** */
     private final GridLocalEventListener discoLsnr = new DiscoveryListener();
 
+    /** Connection requestor. */
+    private ConnectionRequestor connectionRequestor;
+
     /**
      * @return {@code True} if ssl enabled.
      */
@@ -2020,6 +2028,30 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
         return this;
     }
 
+    /**
+     * @return Force client to server connections flag.
+     *
+     * @see #setForceClientToServerConnections(boolean)
+     */
+    @IgniteExperimental
+    public boolean forceClientToServerConnections() {
+        return forceClientToSrvConnections;
+    }
+
+    /**
+     * Applicable for clients only. Sets PSI in the mode when server node cannot open TCP connection to the current
+     * node. Possile reasons for that may be specific network configurations or security rules.
+     * In this mode, when server needs the connection with client, it uses {@link DiscoverySpi} protocol to notify
+     * client about it. After that client opens the required connection from its side.
+     */
+    @IgniteExperimental
+    @IgniteSpiConfiguration(optional = true)
+    public TcpCommunicationSpi setForceClientToServerConnections(boolean forceClientToSrvConnections) {
+        this.forceClientToSrvConnections = forceClientToSrvConnections;
+
+        return this;
+    }
+
     /** {@inheritDoc} */
     @Override public void setListener(CommunicationListener<Message> lsnr) {
         this.lsnr = lsnr;
@@ -2032,6 +2064,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
         return lsnr;
     }
 
+    /** */
+    @IgniteExperimental
+    public void setConnectionRequestor(ConnectionRequestor connectionRequestor) {
+        this.connectionRequestor = connectionRequestor;
+    }
+
     /** {@inheritDoc} */
     @Override public int getSentMessagesCount() {
         // Listener could be not initialized yet, but discovery thread could try to aggregate metrics.
@@ -2324,6 +2362,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
             throw new IgniteSpiException("Failed to initialize TCP server: " + locHost, e);
         }
 
+        boolean forceClientToSrvConnections = forceClientToServerConnections();
+
+        if (usePairedConnections && forceClientToSrvConnections) {
+            throw new IgniteSpiException("Node using paired connections " +
+                "is not allowed to start in forced client to server connections mode.");
+        }
+
         // Set local node attributes.
         try {
             IgniteBiTuple<Collection<String>, Collection<String>> addrs = U.resolveLocalAddresses(locHost);
@@ -2343,6 +2388,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
             res.put(createSpiAttributeName(ATTR_SHMEM_PORT), boundTcpShmemPort >= 0 ? boundTcpShmemPort : null);
             res.put(createSpiAttributeName(ATTR_EXT_ADDRS), extAddrs);
             res.put(createSpiAttributeName(ATTR_PAIRED_CONN), usePairedConnections);
+            res.put(createSpiAttributeName(ATTR_FORCE_CLIENT_SERVER_CONNECTIONS), forceClientToSrvConnections);
 
             return res;
         }
@@ -2557,13 +2603,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                 boolean clientMode = Boolean.TRUE.equals(ignite.configuration().isClientMode());
 
                 IgniteBiInClosure<GridNioSession, Integer> queueSizeMonitor =
-                    !clientMode && slowClientQueueLimit > 0 ?
-                        new CI2<GridNioSession, Integer>() {
-                            @Override public void apply(GridNioSession ses, Integer qSize) {
-                                checkClientQueueSize(ses, qSize);
-                            }
-                        } :
-                        null;
+                    !clientMode && slowClientQueueLimit > 0 ? this::checkClientQueueSize : null;
 
                 GridNioFilter[] filters;
 
@@ -2720,9 +2760,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
         if (nioSrvr != null)
             nioSrvr.stop();
 
-        U.cancel(commWorker);
-        U.join(commWorker, log);
-
         U.cancel(shmemAcceptWorker);
         U.join(shmemAcceptWorker, log);
 
@@ -2739,6 +2776,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
             }
         }
 
+        for (GridFutureAdapter<GridCommunicationClient> fut : clientFuts.values()) {
+            if (fut instanceof ConnectionRequestFuture) {
+                // There's no way it would be done by itself at this point.
+                fut.onDone(new IgniteSpiException("SPI is being stopped."));
+            }
+        }
+
+        if (commWorker != null) {
+            U.cancel(commWorker);
+            U.join(commWorker, log);
+        }
+
         // Clear resources.
         nioSrvr = null;
         commWorker = null;
@@ -2916,7 +2965,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
         else {
             GridCommunicationClient client = null;
 
-            int connIdx = connPlc.connectionIndex();
+            int connIdx;
+
+            if (msg instanceof TcpConnectionIndexAwareMessage) {
+                int msgConnIdx = ((TcpConnectionIndexAwareMessage)msg).connectionIndex();
+
+                connIdx = msgConnIdx == UNDEFINED_CONNECTION_INDEX ? connPlc.connectionIndex() : msgConnIdx;
+            }
+            else
+                connIdx = connPlc.connectionIndex();
 
             try {
                 boolean retry;
@@ -3034,7 +3091,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                     break;
             }
             else {
-                newClients = Arrays.copyOf(curClients, curClients.length);
+                newClients = curClients.clone();
                 newClients[connIdx] = addClient;
 
                 if (clients.replace(node.id(), curClients, newClients))
@@ -3110,6 +3167,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
 
                         fut.onDone(client0);
                     }
+                    catch (NodeUnreachableException e) {
+                        log.warning(e.getMessage());
+
+                        fut = handleUnreachableNodeException(node, connIdx, fut, e);
+                    }
                     catch (Throwable e) {
                         fut.onDone(e);
 
@@ -3154,6 +3216,76 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
     }
 
     /**
+     * Handles {@link NodeUnreachableException}. This means that the method will try to trigger client itself to open
+     * connection. The only possible way of doing this is to use {@link #connectionRequestor}'s trigger and wait.
+     * Specifics of triggers implementation technically should be considered unknown, but for now it's not true and we
+     * expect that {@link NodeUnreachableException} won't be thrown in {@link IgniteDiscoveryThread}.
+     *
+     * @param node Node to open connection to.
+     * @param connIdx Connection index.
+     * @param fut Current future for opening connection.
+     * @param e Curent exception.
+     * @return New future that will return the client or error. {@code null} client is possible if newly opened
+     *      connection has been closed by idle worker, at least that's what documentation says.
+     * @throws IgniteCheckedException If trigerring failed or trigger is not configured.
+     */
+    private GridFutureAdapter<GridCommunicationClient> handleUnreachableNodeException(
+        ClusterNode node,
+        int connIdx,
+        GridFutureAdapter<GridCommunicationClient> fut,
+        NodeUnreachableException e
+    ) throws IgniteCheckedException {
+        if (connectionRequestor != null) {
+            ConnectFuture fut0 = (ConnectFuture)fut;
+
+            ConnectionRequestFuture triggerFut = new ConnectionRequestFuture();
+
+            triggerFut.listen(f -> {
+                try {
+                    fut0.onDone(f.get());
+                }
+                catch (Throwable t) {
+                    fut0.onDone(t);
+                }
+            });
+
+            clientFuts.put(new ConnectionKey(node.id(), connIdx, -1), triggerFut);
+
+            fut = triggerFut;
+
+            try {
+                connectionRequestor.request(node, connIdx);
+
+                long failTimeout = failureDetectionTimeoutEnabled()
+                    ? failureDetectionTimeout()
+                    : getConnectTimeout();
+
+                fut.get(failTimeout);
+            }
+            catch (IgniteCheckedException triggerException) {
+                IgniteSpiException spiE = new IgniteSpiException(triggerException);
+
+                spiE.addSuppressed(e);
+
+                String msg = "Failed to wait for establishing inverse communication connection from node " + node;
+
+                log.warning(msg, spiE);
+
+                fut.onDone(spiE);
+
+                throw spiE;
+            }
+        }
+        else {
+            fut.onDone(e);
+
+            throw new IgniteCheckedException(e);
+        }
+
+        return fut;
+    }
+
+    /**
      * @param node Node to create client for.
      * @param connIdx Connection index.
      * @return Client.
@@ -3206,12 +3338,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
 
         final long time = System.currentTimeMillis() - start;
 
-            if (time > CONNECTION_ESTABLISH_THRESHOLD_MS) {
-                if (log.isInfoEnabled())
-                    log.info("TCP client created [client=" + clientString(client, node) + ", duration=" + time + "ms]");
-            }
-            else if (log.isDebugEnabled())
-                log.debug("TCP client created [client=" + clientString(client, node) + ", duration=" + time + "ms]");
+        if (time > CONNECTION_ESTABLISH_THRESHOLD_MS) {
+            if (log.isInfoEnabled())
+                log.info("TCP client created [client=" + clientString(client, node) + ", duration=" + time + "ms]");
+        }
+        else if (log.isDebugEnabled())
+            log.debug("TCP client created [client=" + clientString(client, node) + ", duration=" + time + "ms]");
 
         return client;
     }
@@ -3229,12 +3361,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
         if (client == null) {
             assert node != null;
 
-            StringJoiner joiner = new StringJoiner(", ");
+            StringJoiner joiner = new StringJoiner(", ", "null, node addrs=[", "]");
 
             for (InetSocketAddress addr : nodeAddresses(node))
                 joiner.add(addr.toString());
 
-            return "null, node addrs=[" + joiner.toString() + "]";
+            return joiner.toString();
         }
         else
             return client.toString();
@@ -3498,6 +3630,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
      * @throws IgniteCheckedException If establish connection fails.
      */
     private GridNioSession createNioSession(ClusterNode node, int connIdx) throws IgniteCheckedException {
+        boolean locNodeIsSrv = !getLocalNode().isClient() && !getLocalNode().isDaemon();
+
+        if (!(Thread.currentThread() instanceof IgniteDiscoveryThread) && locNodeIsSrv) {
+            if (node.isClient() && forceClientToServerConnections(node)) {
+                String msg = "Failed to connect to node " + node.id() + " because it is started" +
+                    " in 'forceClientToServerConnections' mode; inverse connection will be requested.";
+
+                throw new NodeUnreachableException(msg);
+            }
+        }
+
         Collection<InetSocketAddress> addrs = nodeAddresses(node);
 
         GridNioSession ses = null;
@@ -3515,9 +3658,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
             );
         }
 
+        Set<InetSocketAddress> failedAddrsSet = new HashSet<>();
+        int skippedAddrs = 0;
+
         for (InetSocketAddress addr : addrs) {
-            if (addr.isUnresolved())
+            if (addr.isUnresolved()) {
+                failedAddrsSet.add(addr);
+
                 continue;
+            }
 
             TimeoutStrategy connTimeoutStgy = new ExponentialBackoffTimeoutStrategy(
                 totalTimeout,
@@ -3534,6 +3683,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                         log.debug("Skipping local address [addr=" + addr +
                             ", locAddrs=" + node.attribute(createSpiAttributeName(ATTR_ADDRS)) +
                             ", node=" + node + ']');
+
+                    skippedAddrs++;
+
                     break;
                 }
 
@@ -3749,6 +3901,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                         break;
                     }
 
+                    // Inverse communication protocol works only for client nodes.
+                    if (node.isClient() && isNodeUnreachableException(e))
+                        failedAddrsSet.add(addr);
+
                     if (isRecoverableException(e))
                         U.sleep(DFLT_RECONNECT_DELAY);
                     else {
@@ -3779,8 +3935,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                 break;
         }
 
-        if (ses == null)
+        if (ses == null) {
+            if (!(Thread.currentThread() instanceof IgniteDiscoveryThread) && locNodeIsSrv) {
+                if (node.isClient() && (addrs.size() - skippedAddrs == failedAddrsSet.size())) {
+                    String msg = "Failed to connect to all addresses of node " + node.id() + ": " + failedAddrsSet +
+                        "; inverse connection will be requested.";
+
+                    throw new NodeUnreachableException(msg);
+                }
+            }
+
             processSessionCreationError(node, addrs, errs == null ? new IgniteCheckedException("No session found") : errs);
+        }
 
         return ses;
     }
@@ -3898,6 +4064,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
         );
     }
 
+    /**
+     * Checks if exception indicates that client is unreachable.
+     *
+     * @param e Exception to check.
+     * @return {@code True} if exception shows that client is unreachable, {@code false} otherwise.
+     */
+    private boolean isNodeUnreachableException(Exception e) {
+        return e instanceof SocketTimeoutException;
+    }
+
     /** */
     private IgniteSpiOperationTimeoutException handshakeTimeoutException() {
         return new IgniteSpiOperationTimeoutException("Failed to perform handshake due to timeout " +
@@ -4211,6 +4387,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
      *
      * FOR TEST PURPOSES ONLY!!!
      */
+    @TestOnly
     public void simulateNodeFailure() {
         if (nioSrvr != null)
             nioSrvr.stop();
@@ -4281,6 +4458,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
     }
 
     /**
+     * @param node Node.
+     * @return {@code True} if remote current node cannot receive TCP connections. Applicable for client nodes only.
+     */
+    private boolean forceClientToServerConnections(ClusterNode node) {
+        Boolean forceClientToSrvConnections = node.attribute(createSpiAttributeName(ATTR_FORCE_CLIENT_SERVER_CONNECTIONS));
+
+        return Boolean.TRUE.equals(forceClientToSrvConnections);
+    }
+
+    /**
      * @param recoveryDescs Descriptors map.
      * @param pairedConnections {@code True} if in/out connections pair is used for communication with node.
      * @param node Node.
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionRequestFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionRequestFuture.java
new file mode 100644
index 0000000..02531ac
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionRequestFuture.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp.internal;
+
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.nio.GridCommunicationClient;
+
+/**
+ * Marker future implementation, just like {@code ConnectFuture}, but meaning that we're waiting for the inverse
+ * connection.
+ */
+public class ConnectionRequestFuture extends GridFutureAdapter<GridCommunicationClient> {
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryServerOnlyCustomEventMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionRequestor.java
similarity index 50%
copy from modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryServerOnlyCustomEventMessage.java
copy to modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionRequestor.java
index a54b67e..686e2cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryServerOnlyCustomEventMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionRequestor.java
@@ -15,28 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.spi.discovery.tcp.messages;
+package org.apache.ignite.spi.communication.tcp.internal;
 
-import java.util.UUID;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Wrapped for custom message that must not be delivered to the client nodes.
- */
-@TcpDiscoveryEnsureDelivery
-public class TcpDiscoveryServerOnlyCustomEventMessage extends TcpDiscoveryCustomEventMessage {
-    /** */
-    private static final long serialVersionUID = 0L;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
 
+/** Helper interface to ask other nodes to open connections. */
+public interface ConnectionRequestor {
     /**
-     * @param creatorNodeId Creator node id.
-     * @param msg Message.
-     * @param msgBytes Serialized message.
+     * Request opening of TCP connection from node {@code node} with index {@code connIdx}.
+     *
+     * @param node Node.
+     * @param connIdx Connection index.
      */
-    public TcpDiscoveryServerOnlyCustomEventMessage(UUID creatorNodeId, @Nullable DiscoverySpiCustomMessage msg,
-        @NotNull byte[] msgBytes) {
-        super(creatorNodeId, msg, msgBytes);
-    }
+    public void request(ClusterNode node, int connIdx) throws IgniteCheckedException;
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/NodeUnreachableException.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/NodeUnreachableException.java
new file mode 100644
index 0000000..d947679
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/NodeUnreachableException.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp.internal;
+
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+
+/**
+ * Exception is thrown by {@link TcpCommunicationSpi} when some or all addresses of a node are unreachable and
+ * direct communication connection cannot be established.
+ *
+ * <p>
+ *     Ability to open direct connections between any nodes in cluster in any direction
+ *     is necessary for proper functioning of the cluster.
+ * </p>
+ * <p>
+ *     However if some nodes deployed without open public IPs (e.g. client deployed in a Kubernetes environment)
+ *     this invariant is broken: these nodes still can open connections to other nodes
+ *     but no other nodes are able to connect to such nodes.
+ * </p>
+ * <p>
+ *     To enable connections to such "hidden" nodes inverse connection protocol is used: when a node detects
+ *     that it cannot reach this "hidden" node it throws this exception and triggers the protocol.
+ * </p>
+ */
+public class NodeUnreachableException extends IgniteSpiException {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    public NodeUnreachableException(String msg) {
+        super(msg);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryServerOnlyCustomEventMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpConnectionIndexAwareMessage.java
similarity index 51%
copy from modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryServerOnlyCustomEventMessage.java
copy to modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpConnectionIndexAwareMessage.java
index a54b67e..12ce69b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryServerOnlyCustomEventMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpConnectionIndexAwareMessage.java
@@ -15,28 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.spi.discovery.tcp.messages;
+package org.apache.ignite.spi.communication.tcp.internal;
 
-import java.util.UUID;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.plugin.extensions.communication.Message;
 
-/**
- * Wrapped for custom message that must not be delivered to the client nodes.
- */
-@TcpDiscoveryEnsureDelivery
-public class TcpDiscoveryServerOnlyCustomEventMessage extends TcpDiscoveryCustomEventMessage {
+/** */
+public interface TcpConnectionIndexAwareMessage extends Message {
     /** */
-    private static final long serialVersionUID = 0L;
+    public static final int UNDEFINED_CONNECTION_INDEX = -1;
 
     /**
-     * @param creatorNodeId Creator node id.
-     * @param msg Message.
-     * @param msgBytes Serialized message.
+     * @return {@link #UNDEFINED_CONNECTION_INDEX} if standard index has to be used. Desired connection index otherwise.
      */
-    public TcpDiscoveryServerOnlyCustomEventMessage(UUID creatorNodeId, @Nullable DiscoverySpiCustomMessage msg,
-        @NotNull byte[] msgBytes) {
-        super(creatorNodeId, msg, msgBytes);
-    }
+    public int connectionIndex();
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpConnectionRequestDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpConnectionRequestDiscoveryMessage.java
new file mode 100644
index 0000000..c512c22
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpConnectionRequestDiscoveryMessage.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp.internal;
+
+import java.util.UUID;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Message is part of communication via discovery protocol.
+ *
+ * It is used when a node (say node A) cannot establish a communication connection to other node (node B) in topology
+ * due to firewall or network configuration and sends this message requesting inverse connection:
+ * node B receives request and opens communication connection to node A
+ * thus allowing both nodes to communicate to each other.
+ */
+public class TcpConnectionRequestDiscoveryMessage implements DiscoveryCustomMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final IgniteUuid id = IgniteUuid.randomUuid();
+
+    /** */
+    @GridToStringInclude
+    private final UUID receiverNodeId;
+
+    /** */
+    @GridToStringInclude
+    private final int connIdx;
+
+    /** */
+    public TcpConnectionRequestDiscoveryMessage(UUID receiverNodeId, int connIdx) {
+        this.receiverNodeId = receiverNodeId;
+        this.connIdx = connIdx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid id() {
+        return id;
+    }
+
+    /** */
+    public UUID receiverNodeId() {
+        return receiverNodeId;
+    }
+
+    /** */
+    public int connectionIndex() {
+        return connIdx;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public DiscoCache createDiscoCache(
+        GridDiscoveryManager mgr,
+        AffinityTopologyVersion topVer,
+        DiscoCache discoCache
+    ) {
+        throw new UnsupportedOperationException("createDiscoCache");
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(TcpConnectionRequestDiscoveryMessage.class, this);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoSecurityAwareMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpInverseConnectionResponseMessage.java
similarity index 55%
copy from modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoSecurityAwareMessage.java
copy to modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpInverseConnectionResponseMessage.java
index c1c7d97..388d5de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoSecurityAwareMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpInverseConnectionResponseMessage.java
@@ -15,84 +15,46 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.managers.communication;
+package org.apache.ignite.spi.communication.tcp.internal;
 
-import java.io.Externalizable;
 import java.nio.ByteBuffer;
-import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 
 /**
+ * Inverse connection response message sent by client node as a response to
+ * inverse connection request received by discovery.
  *
+ * The main purpose of this message is to communicate back to server node connection index of a thread waiting for
+ * establishing of communication connection.
  */
-public class GridIoSecurityAwareMessage extends GridIoMessage {
+public class TcpInverseConnectionResponseMessage implements Message {
     /** */
     private static final long serialVersionUID = 0L;
 
     /** */
-    public static final short TYPE_CODE = 174;
+    private int connIdx;
 
-    /** Security subject id that will be used during message processing on an remote node. */
-    private UUID secSubjId;
-
-    /**
-     * No-op constructor to support {@link Externalizable} interface.
-     * This constructor is not meant to be used for other purposes.
-     */
-    public GridIoSecurityAwareMessage() {
-        // No-op.
-    }
-
-    /**
-     * @param secSubjId Security subject id.
-     * @param plc Policy.
-     * @param topic Communication topic.
-     * @param topicOrd Topic ordinal value.
-     * @param msg Message.
-     * @param ordered Message ordered flag.
-     * @param timeout Timeout.
-     * @param skipOnTimeout Whether message can be skipped on timeout.
-     */
-    public GridIoSecurityAwareMessage(
-        UUID secSubjId,
-        byte plc,
-        Object topic,
-        int topicOrd,
-        Message msg,
-        boolean ordered,
-        long timeout,
-        boolean skipOnTimeout) {
-        super(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout);
-
-        this.secSubjId = secSubjId;
-    }
-
-    /**
-     * @return Security subject id.
-     */
-    UUID secSubjId() {
-        return secSubjId;
+    /** */
+    public TcpInverseConnectionResponseMessage() {
     }
 
-    /** {@inheritDoc} */
-    @Override public short directType() {
-        return TYPE_CODE;
+    /** */
+    public TcpInverseConnectionResponseMessage(int connIdx) {
+        this.connIdx = connIdx;
     }
 
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        return 8;
+    /** */
+    public int connectionIndex() {
+        return connIdx;
     }
 
     /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         writer.setBuffer(buf);
 
-        if (!super.writeTo(buf, writer))
-            return false;
-
         if (!writer.isHeaderWritten()) {
             if (!writer.writeHeader(directType(), fieldsCount()))
                 return false;
@@ -101,8 +63,8 @@ public class GridIoSecurityAwareMessage extends GridIoMessage {
         }
 
         switch (writer.state()) {
-            case 7:
-                if (!writer.writeUuid("secSubjId", secSubjId))
+            case 0:
+                if (!writer.writeInt("connIdx", connIdx))
                     return false;
 
                 writer.incrementState();
@@ -113,18 +75,20 @@ public class GridIoSecurityAwareMessage extends GridIoMessage {
     }
 
     /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
         reader.setBuffer(buf);
 
         if (!reader.beforeMessageRead())
             return false;
 
-        if (!super.readFrom(buf, reader))
-            return false;
-
         switch (reader.state()) {
-            case 7:
-                secSubjId = reader.readUuid("secSubjId");
+            case 0:
+                connIdx = reader.readInt("connIdx");
 
                 if (!reader.isLastRead())
                     return false;
@@ -133,6 +97,21 @@ public class GridIoSecurityAwareMessage extends GridIoMessage {
 
         }
 
-        return reader.afterMessageRead(GridIoSecurityAwareMessage.class);
+        return reader.afterMessageRead(TcpInverseConnectionResponseMessage.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 177;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(TcpInverseConnectionResponseMessage.class, this);
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryServerOnlyCustomEventMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryServerOnlyCustomEventMessage.java
index a54b67e..97f701e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryServerOnlyCustomEventMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryServerOnlyCustomEventMessage.java
@@ -20,7 +20,6 @@ package org.apache.ignite.spi.discovery.tcp.messages;
 import java.util.UUID;
 import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
 import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
 
 /**
  * Wrapped for custom message that must not be delivered to the client nodes.
@@ -35,7 +34,7 @@ public class TcpDiscoveryServerOnlyCustomEventMessage extends TcpDiscoveryCustom
      * @param msg Message.
      * @param msgBytes Serialized message.
      */
-    public TcpDiscoveryServerOnlyCustomEventMessage(UUID creatorNodeId, @Nullable DiscoverySpiCustomMessage msg,
+    public TcpDiscoveryServerOnlyCustomEventMessage(UUID creatorNodeId, @NotNull DiscoverySpiCustomMessage msg,
         @NotNull byte[] msgBytes) {
         super(creatorNodeId, msg, msgBytes);
     }
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index 82e06cb..6abdef7 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -14,7 +14,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
 org.apache.ignite.IgniteAuthenticationException
 org.apache.ignite.IgniteCacheRestartingException
 org.apache.ignite.IgniteCheckedException
@@ -2490,6 +2489,7 @@ org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi$9
 org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi$HandshakeClosure
 org.apache.ignite.spi.communication.tcp.internal.HandshakeException
 org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConnectionCheckFuture$SingleAddressConnectFuture$1
+org.apache.ignite.spi.communication.tcp.internal.TcpInverseConnectionResponseMessage
 org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage
 org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage2
 org.apache.ignite.spi.communication.tcp.messages.HandshakeWaitMessage
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientReconnectTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientReconnectTest.java
index c37fa4d..997594a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientReconnectTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientReconnectTest.java
@@ -17,11 +17,13 @@
 
 package org.apache.ignite.internal.processors.cache.distributed;
 
+import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
@@ -31,10 +33,12 @@ import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.WithSystemProperty;
@@ -45,6 +49,8 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
+import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED;
 
 /**
  * Test for customer scenario.
@@ -299,6 +305,63 @@ public class IgniteCacheClientReconnectTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Verifies that new node ID generated by client on disconnect replaces old ID only on RECONNECTED event.
+     *
+     * Old node ID is still available on DISCONNECTED event.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testClientIdUpdateOnReconnect() throws Exception {
+        startGrid(0);
+
+        IgniteEx clNode = startClientGrid(1);
+        UUID oldNodeId = clNode.localNode().id();
+
+        awaitPartitionMapExchange();
+
+        stopGrid(0);
+
+        AtomicReference<UUID> idOnDisconnect = new AtomicReference<>();
+        AtomicReference<UUID> idOnReconnect = new AtomicReference<>();
+
+        CountDownLatch disconnectedLatch = new CountDownLatch(1);
+        CountDownLatch reconnectedLatch = new CountDownLatch(1);
+
+        clNode.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event event) {
+                switch (event.type()) {
+                    case EVT_CLIENT_NODE_DISCONNECTED: {
+                        idOnDisconnect.set(event.node().id());
+
+                        disconnectedLatch.countDown();
+
+                        break;
+                    }
+
+                    case EVT_CLIENT_NODE_RECONNECTED: {
+                        idOnReconnect.set(event.node().id());
+
+                        reconnectedLatch.countDown();
+
+                        break;
+                    }
+                }
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+        assertTrue(GridTestUtils.waitForCondition(() -> disconnectedLatch.getCount() == 0, 10_000));
+        assertEquals(oldNodeId, idOnDisconnect.get());
+
+        startGrid(0);
+
+        assertTrue(GridTestUtils.waitForCondition(() -> reconnectedLatch.getCount() == 0, 10_000));
+        assertEquals(grid(1).localNode().id(), idOnReconnect.get());
+    }
+
+    /**
      * @param ignite Ignite.
      */
     private void putGet(Ignite ignite) {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
index 115a1fc..650ea1a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
@@ -60,6 +60,7 @@ import org.junit.Test;
 
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.spi.communication.tcp.internal.TcpConnectionIndexAwareMessage.UNDEFINED_CONNECTION_INDEX;
 
 /**
  * Tests for {@code IgniteDataStreamerImpl}.
@@ -609,7 +610,9 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest {
                             appMsg,
                             GridTestUtils.<Boolean>getFieldValue(ioMsg, "ordered"),
                             ioMsg.timeout(),
-                            ioMsg.skipOnTimeout());
+                            ioMsg.skipOnTimeout(),
+                            UNDEFINED_CONNECTION_INDEX
+                        );
 
                         needStaleTop = false;
                     }
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationInverseConnectionEstablishingTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationInverseConnectionEstablishingTest.java
new file mode 100644
index 0000000..ecc56e3
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationInverseConnectionEstablishingTest.java
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.util.nio.GridCommunicationClient;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.internal.TcpInverseConnectionResponseMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assume;
+import org.junit.Test;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+
+/**
+ * Tests for communication over discovery feature (inverse communication request).
+ */
+public class GridTcpCommunicationInverseConnectionEstablishingTest extends GridCommonAbstractTest {
+
+    /** */
+    private static final String UNREACHABLE_IP = "172.31.30.132";
+
+    /** */
+    private static final String UNRESOLVED_HOST = "unresolvedHost";
+
+    /** */
+    private static final String CACHE_NAME = "cache-0";
+
+    /** */
+    private static final AtomicReference<String> UNREACHABLE_DESTINATION = new AtomicReference<>();
+
+    /** Allows to make client not to respond to inverse connection request. */
+    private static final AtomicBoolean RESPOND_TO_INVERSE_REQUEST = new AtomicBoolean(true);
+
+    /** */
+    private static final int SRVS_NUM = 2;
+
+    /** */
+    private boolean clientMode;
+
+    /** */
+    private boolean forceClientToSrvConnections;
+
+    /** */
+    private CacheConfiguration ccfg;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setFailureDetectionTimeout(8_000);
+
+        cfg.setCommunicationSpi(new TestCommunicationSpi());
+
+        if (ccfg != null) {
+            cfg.setCacheConfiguration(ccfg);
+
+            ccfg = null;
+        }
+
+        if (clientMode)
+            cfg.setClientMode(true);
+
+        return cfg;
+    }
+
+    /**
+     * Verifies that server successfully connects to "unreachable" client with
+     * {@link TcpCommunicationSpi#forceClientToServerConnections()}} flag.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testUnreachableClientInVirtualizedEnvironment() throws Exception {
+        UNREACHABLE_DESTINATION.set(UNREACHABLE_IP);
+        RESPOND_TO_INVERSE_REQUEST.set(true);
+
+        executeCacheTestWithUnreachableClient(true);
+    }
+
+    /**
+     * Verifies that server successfully connects to "unreachable" client with
+     * {@link TcpCommunicationSpi#forceClientToServerConnections()}} flag.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testUnreachableClientInStandAloneEnvironment() throws Exception {
+        UNREACHABLE_DESTINATION.set(UNREACHABLE_IP);
+        RESPOND_TO_INVERSE_REQUEST.set(true);
+
+        executeCacheTestWithUnreachableClient(false);
+    }
+
+    /**
+     * Verifies that server successfully connects to client provided unresolvable host in virtualized environment.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testClientWithUnresolvableHostInVirtualizedEnvironment() throws Exception {
+        UNREACHABLE_DESTINATION.set(UNRESOLVED_HOST);
+        RESPOND_TO_INVERSE_REQUEST.set(true);
+
+        executeCacheTestWithUnreachableClient(true);
+    }
+
+    /**
+     * Verifies that server successfully connects to client provided unresolvable host in stand-alone environment.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testClientWithUnresolvableHostInStandAloneEnvironment() throws Exception {
+        UNREACHABLE_DESTINATION.set(UNRESOLVED_HOST);
+        RESPOND_TO_INVERSE_REQUEST.set(true);
+
+        executeCacheTestWithUnreachableClient(false);
+    }
+
+    /**
+     * Verify that inverse connection can be established if client reconnects to another router server with the same id.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testClientReconnectDuringInverseConnection() throws Exception {
+        UNREACHABLE_DESTINATION.set(UNRESOLVED_HOST);
+        RESPOND_TO_INVERSE_REQUEST.set(true);
+
+        Assume.assumeThat(System.getProperty("zookeeper.forceSync"), is(nullValue()));
+
+        startGrid(0).cluster().state(ClusterState.ACTIVE);
+
+        startGrid(1, (UnaryOperator<IgniteConfiguration>) cfg -> {
+            cfg.setClientMode(true);
+
+            ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(new TcpDiscoveryVmIpFinder(false)
+                .setAddresses(
+                    Collections.singletonList("127.0.0.1:47500..47502") // "47501" is a port of the client itself.
+                )
+            );
+
+            return cfg;
+        });
+
+        AtomicBoolean msgRcvd = new AtomicBoolean();
+
+        grid(1).context().io().addMessageListener(GridTopic.TOPIC_IO_TEST, (nodeId, msg, plc) -> {
+            msgRcvd.set(true);
+        });
+
+        UUID clientNodeId = grid(1).context().localNodeId();
+        UUID oldRouterNode = ((TcpDiscoveryNode)grid(1).localNode()).clientRouterNodeId();
+
+        startGrid(2);
+
+        startGrid(3);
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> {
+            ClusterNode clientNode = grid(3).context().discovery().node(clientNodeId);
+
+            grid(3).context().io().sendIoTest(clientNode, new byte[10], false);
+        });
+
+        doSleep(2000L); // Client failover timeout is 8 seconds.
+
+        stopGrid(0);
+
+        fut.get(8000L);
+
+        UUID newId = grid(1).localNode().id();
+        UUID newRouterNode = ((TcpDiscoveryNode)grid(1).localNode()).clientRouterNodeId();
+
+        assertEquals(clientNodeId, newId);
+        assertFalse(oldRouterNode + " " + newRouterNode, newRouterNode.equals(oldRouterNode));
+
+        assertTrue(GridTestUtils.waitForCondition(msgRcvd::get, 1000L));
+    }
+
+    /**
+     * Executes cache test with "unreachable" client.
+     *
+     * @param forceClientToSrvConnections Flag for the client mode.
+     * @throws Exception If failed.
+     */
+    private void executeCacheTestWithUnreachableClient(boolean forceClientToSrvConnections) throws Exception {
+        LogListener lsnr = LogListener.matches("Failed to send message to remote node").atMost(0).build();
+
+        for (int i = 0; i < SRVS_NUM; i++) {
+            ccfg = cacheConfiguration(CACHE_NAME, ATOMIC);
+
+            startGrid(i, cfg -> {
+                ListeningTestLogger log = new ListeningTestLogger(false, cfg.getGridLogger());
+
+                log.registerListener(lsnr);
+
+                return cfg.setGridLogger(log);
+            });
+        }
+
+        clientMode = true;
+        this.forceClientToSrvConnections = forceClientToSrvConnections;
+
+        startGrid(SRVS_NUM);
+
+        putAndCheckKey();
+
+        assertTrue(lsnr.check());
+    }
+
+    /**
+     * No server threads hang even if client doesn't respond to inverse connection request.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testClientSkipsInverseConnectionResponse() throws Exception {
+        UNREACHABLE_DESTINATION.set(UNRESOLVED_HOST);
+        RESPOND_TO_INVERSE_REQUEST.set(false);
+
+        startGrids(SRVS_NUM - 1);
+
+        LogListener lsnr = LogListener.matches(
+            "Failed to wait for establishing inverse communication connection"
+        ).build();
+
+        startGrid(SRVS_NUM - 1, cfg -> {
+            ListeningTestLogger log = new ListeningTestLogger(false, cfg.getGridLogger());
+
+            log.registerListener(lsnr);
+
+            return cfg.setGridLogger(log);
+        });
+
+        clientMode = true;
+        forceClientToSrvConnections = false;
+
+        IgniteEx client = startGrid(SRVS_NUM);
+        ClusterNode clientNode = client.localNode();
+
+        IgniteEx srv = grid(SRVS_NUM - 1);
+
+        // We need to interrupt communication worker client nodes so that
+        // closed connection won't automatically reopen when we don't expect it.
+        // Server communication worker is interrupted for another reason - it can hang the test
+        // due to bug in inverse connection protocol & comm worker - it will be fixed later.
+        List<Thread> tcpCommWorkerThreads = Thread.getAllStackTraces().keySet().stream()
+            .filter(t -> t.getName().contains("tcp-comm-worker"))
+            .filter(t -> t.getName().contains(srv.name()) || t.getName().contains(client.name()))
+            .collect(Collectors.toList());
+
+        for (Thread tcpCommWorkerThread : tcpCommWorkerThreads) {
+            U.interrupt(tcpCommWorkerThread);
+
+            U.join(tcpCommWorkerThread, log);
+        }
+
+        TcpCommunicationSpi spi = (TcpCommunicationSpi)srv.configuration().getCommunicationSpi();
+
+        GridTestUtils.invoke(spi, "onNodeLeft", clientNode.consistentId(), clientNode.id());
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() ->
+            srv.context().io().sendIoTest(clientNode, new byte[10], false).get()
+        );
+
+        assertTrue(GridTestUtils.waitForCondition(fut::isDone, 30_000));
+
+        assertTrue(lsnr.check());
+    }
+
+    /**
+     * @param name Cache name.
+     * @param atomicityMode Atomicity mode.
+     * @return Cache configuration.
+     */
+    protected final CacheConfiguration cacheConfiguration(String name, CacheAtomicityMode atomicityMode) {
+        CacheConfiguration ccfg = new CacheConfiguration(name);
+
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setBackups(1);
+
+        return ccfg;
+    }
+
+    /**
+     * Puts a key to a server that is backup for the key and doesn't have an open communication connection to client.
+     * This forces the server to establish a connection to "unreachable" client.
+     */
+    private void putAndCheckKey() {
+        int key = 0;
+        IgniteEx srv2 = grid(SRVS_NUM - 1);
+
+        for (int i = 0; i < 1_000; i++) {
+            if (srv2.affinity(CACHE_NAME).isBackup(srv2.localNode(), i)) {
+                key = i;
+
+                break;
+            }
+        }
+
+        IgniteEx cl0 = grid(SRVS_NUM);
+
+        IgniteCache<Object, Object> cache = cl0.cache(CACHE_NAME);
+
+        cache.put(key, key);
+        assertEquals(key, cache.get(key));
+    }
+
+    /** */
+    private class TestCommunicationSpi extends TcpCommunicationSpi {
+        {
+            setForceClientToServerConnections(forceClientToSrvConnections);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
+            if (node.isClient()) {
+                Map<String, Object> attrs = new HashMap<>(node.attributes());
+
+                attrs.put(createAttributeName(ATTR_ADDRS), Collections.singleton(UNREACHABLE_DESTINATION.get()));
+                attrs.put(createAttributeName(ATTR_PORT), 47200);
+                attrs.put(createAttributeName(ATTR_EXT_ADDRS), Collections.emptyList());
+                attrs.put(createAttributeName(ATTR_HOST_NAMES), Collections.emptyList());
+
+                ((TcpDiscoveryNode)(node)).setAttributes(attrs);
+            }
+
+            return super.createTcpClient(node, connIdx);
+        }
+
+        /**
+         * @param name Name.
+         */
+        private String createAttributeName(String name) {
+            return getClass().getSimpleName() + '.' + name;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg,
+            IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
+            if (msg instanceof GridIoMessage) {
+                GridIoMessage msg0 = (GridIoMessage)msg;
+
+                if (msg0.message() instanceof TcpInverseConnectionResponseMessage && !RESPOND_TO_INVERSE_REQUEST.get()) {
+                    log.info("Client skips inverse connection response to server: " + node);
+
+                    return;
+                }
+            }
+
+            super.sendMessage(node, msg, ackC);
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index fcde6fc..b237be7 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -44,6 +44,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
+import java.util.function.UnaryOperator;
 import javax.cache.configuration.Factory;
 import javax.cache.configuration.FactoryBuilder;
 import javax.management.DynamicMBean;
@@ -671,6 +672,8 @@ public abstract class GridAbstractTest extends JUnitAssertAware {
     private void beforeFirstTest() throws Exception {
         sharedStaticIpFinder = new TcpDiscoveryVmIpFinder(true);
 
+        clsLdr = Thread.currentThread().getContextClassLoader();
+
         U.quietAndInfo(log(), ">>> Starting test class: " + testClassDescription() + " <<<");
 
         if (isSafeTopology())
@@ -696,6 +699,7 @@ public abstract class GridAbstractTest extends JUnitAssertAware {
             t.printStackTrace();
 
             try {
+                // This is a very questionable solution.
                 cleanUpTestEnviroment();
             }
             catch (Exception e) {
@@ -983,6 +987,18 @@ public abstract class GridAbstractTest extends JUnitAssertAware {
     }
 
     /**
+     * Starts new grid with given index.
+     *
+     * @param idx Index of the grid to start.
+     * @param cfgOp Configuration mutator. Can be used to avoid overcomplification of {@link #getConfiguration()}.
+     * @return Started grid.
+     * @throws Exception If anything failed.
+     */
+    protected IgniteEx startGrid(int idx, UnaryOperator<IgniteConfiguration> cfgOp) throws Exception {
+        return startGrid(getTestIgniteInstanceName(idx), cfgOp);
+    }
+
+    /**
      * Starts new client grid with given index.
      *
      * @param idx Index of the grid to start.
@@ -1049,6 +1065,18 @@ public abstract class GridAbstractTest extends JUnitAssertAware {
      * Starts new grid with given name.
      *
      * @param igniteInstanceName Ignite instance name.
+     * @param cfgOp Configuration mutator. Can be used to avoid overcomplification of {@link #getConfiguration()}.
+     * @return Started grid.
+     * @throws Exception If anything failed.
+     */
+    protected IgniteEx startGrid(String igniteInstanceName, UnaryOperator<IgniteConfiguration> cfgOp) throws Exception {
+        return (IgniteEx)startGrid(igniteInstanceName, cfgOp, null);
+    }
+
+    /**
+     * Starts new grid with given name.
+     *
+     * @param igniteInstanceName Ignite instance name.
      * @param ctx Spring context.
      * @return Started grid.
      * @throws Exception If failed.
@@ -1083,6 +1111,24 @@ public abstract class GridAbstractTest extends JUnitAssertAware {
      * Starts new grid with given name.
      *
      * @param igniteInstanceName Ignite instance name.
+     * @param cfgOp Configuration mutator. Can be used to avoid overcomplification of {@link #getConfiguration()}.
+     * @param ctx Spring context.
+     * @return Started grid.
+     * @throws Exception If anything failed.
+     */
+    protected Ignite startGrid(String igniteInstanceName, UnaryOperator<IgniteConfiguration> cfgOp, GridSpringResourceContext ctx) throws Exception {
+        IgniteConfiguration cfg = optimize(getConfiguration(igniteInstanceName));
+
+        if (cfgOp != null)
+            cfg = cfgOp.apply(cfg);
+
+        return startGrid(igniteInstanceName, cfg, ctx);
+    }
+
+    /**
+     * Starts new grid with given name.
+     *
+     * @param igniteInstanceName Ignite instance name.
      * @param ctx Spring context.
      * @return Started grid.
      * @throws Exception If failed.
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
index 30ae6f8..ca62134 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.testsuites;
 
+import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationInverseConnectionEstablishingTest;
 import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiConcurrentConnectSelfTest;
 import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiConcurrentConnectSslSelfTest;
 import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiConfigSelfTest;
@@ -100,7 +101,9 @@ import org.junit.runners.Suite;
     IgniteTcpCommunicationConnectOnInitTest.class,
 
     TcpCommunicationSpiMultiJvmTest.class,
-    TooManyOpenFilesTcpCommunicationSpiTest.class
+    TooManyOpenFilesTcpCommunicationSpiTest.class,
+
+    GridTcpCommunicationInverseConnectionEstablishingTest.class
 
     //GridCacheDhtLockBackupSelfTest.class,
 })


Mime
View raw message