ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [10/50] [abbrv] ignite git commit: ignite-1758 Fixed issues with client reconnect handling
Date Sun, 08 Nov 2015 17:23:06 GMT
ignite-1758 Fixed issues with client reconnect handling


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6ea3b562
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6ea3b562
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6ea3b562

Branch: refs/heads/ignite-1702
Commit: 6ea3b56205de19ceac89762d9c20c3fe62ab13b9
Parents: 04964b9
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Oct 30 16:33:40 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Oct 30 16:33:40 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |   3 +
 .../apache/ignite/internal/IgniteKernal.java    |  14 +-
 .../processors/cache/GridCacheProcessor.java    |  77 +++--
 .../dht/preloader/GridDhtPreloader.java         |   4 +-
 .../CacheObjectPortableProcessorImpl.java       |   9 +
 .../util/nio/GridNioRecoveryDescriptor.java     |  11 +-
 .../communication/tcp/TcpCommunicationSpi.java  |  40 ++-
 .../ignite/spi/discovery/tcp/ClientImpl.java    | 205 ++++++++-----
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 213 +++++++++----
 .../messages/TcpDiscoveryAbstractMessage.java   |  11 +
 .../messages/TcpDiscoveryNodeAddedMessage.java  |  39 +++
 .../IgniteClientReconnectCacheTest.java         |  33 ++
 .../cache/GridCacheAbstractFullApiSelfTest.java |   3 +
 .../CacheGetFutureHangsSelfTest.java            |   8 +
 .../IgniteCacheClientReconnectTest.java         |   2 +
 .../distributed/IgniteCacheManyClientsTest.java |  14 +-
 ...gniteClientReconnectMassiveShutdownTest.java | 303 +++++++++++++++++++
 .../tcp/TcpDiscoveryMultiThreadedTest.java      | 285 +++++++++++++----
 18 files changed, 1021 insertions(+), 253 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 1e7d002..de7c10b 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -355,6 +355,9 @@ public final class IgniteSystemProperties {
     /** Maximum size for affinity assignment history. */
     public static final String IGNITE_AFFINITY_HISTORY_SIZE = "IGNITE_AFFINITY_HISTORY_SIZE";
 
+    /** Maximum size for discovery messages history. */
+    public static final String IGNITE_DISCOVERY_HISTORY_SIZE = "IGNITE_DISCOVERY_HISTORY_SIZE";
+
     /** Number of cache operation retries in case of topology exceptions. */
     public static final String IGNITE_CACHE_RETRIES_COUNT = "IGNITE_CACHE_RETRIES_COUNT";
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 4820a93..5a0fe16 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -165,6 +165,7 @@ import org.apache.ignite.plugin.PluginNotFoundException;
 import org.apache.ignite.plugin.PluginProvider;
 import org.apache.ignite.spi.IgniteSpi;
 import org.apache.ignite.spi.IgniteSpiVersionCheckException;
+import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_CONFIG_URL;
@@ -3158,10 +3159,17 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
 
     /** {@inheritDoc} */
     public void dumpDebugInfo() {
-        U.warn(log, "Dumping debug info for node [id=" + ctx.localNodeId() +
+        boolean client = ctx.clientNode();
+
+        ClusterNode locNode = ctx.discovery().localNode();
+
+        UUID routerId = locNode instanceof TcpDiscoveryNode ? ((TcpDiscoveryNode)locNode).clientRouterNodeId() : null;
+
+        U.warn(log, "Dumping debug info for node [id=" + locNode.id() +
             ", name=" + ctx.gridName() +
-            ", order=" + ctx.discovery().localNode().order() +
-            ", client=" + ctx.clientNode() + ']');
+            ", order=" + locNode.order() +
+            ", client=" + client +
+            (client && routerId != null ? ", routerId=" + routerId : "") + ']');
 
         ctx.cache().context().exchange().dumpDebugInfo();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 5bf4ac7..301e7d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1803,61 +1803,80 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Nullable @Override public Serializable collectDiscoveryData(UUID nodeId) {
+        boolean reconnect = ctx.localNodeId().equals(nodeId) && cachesOnDisconnect != null;
+
         // Collect dynamically started caches to a single object.
-        Collection<DynamicCacheChangeRequest> reqs =
-            new ArrayList<>(registeredCaches.size() + registeredTemplates.size());
+        Collection<DynamicCacheChangeRequest> reqs;
 
-        boolean reconnect = ctx.localNodeId().equals(nodeId) && cachesOnDisconnect != null;
+        Map<String, Map<UUID, Boolean>> clientNodesMap;
 
-        Map<String, DynamicCacheDescriptor> descs = reconnect ? cachesOnDisconnect : registeredCaches;
+        if (reconnect) {
+            reqs = new ArrayList<>(caches.size());
 
-        for (DynamicCacheDescriptor desc : descs.values()) {
-            DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null);
+            clientNodesMap = U.newHashMap(caches.size());
 
-            req.startCacheConfiguration(desc.cacheConfiguration());
+            for (GridCacheAdapter<?, ?> cache : caches.values()) {
+                DynamicCacheDescriptor desc = cachesOnDisconnect.get(maskNull(cache.name()));
 
-            req.cacheType(desc.cacheType());
+                if (desc == null)
+                    continue;
 
-            req.deploymentId(desc.deploymentId());
+                DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(cache.name(), null);
 
-            reqs.add(req);
-        }
+                req.startCacheConfiguration(desc.cacheConfiguration());
 
-        for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
-            DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null);
+                req.cacheType(desc.cacheType());
 
-            req.startCacheConfiguration(desc.cacheConfiguration());
+                req.deploymentId(desc.deploymentId());
 
-            req.template(true);
+                reqs.add(req);
 
-            req.deploymentId(desc.deploymentId());
+                Boolean nearEnabled = cache.isNear();
+
+                Map<UUID, Boolean> map = U.newHashMap(1);
+
+                map.put(nodeId, nearEnabled);
 
-            reqs.add(req);
+                clientNodesMap.put(cache.name(), map);
+            }
         }
+        else {
+            reqs = new ArrayList<>(registeredCaches.size() + registeredTemplates.size());
 
-        DynamicCacheChangeBatch req = new DynamicCacheChangeBatch(reqs);
+            for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+                DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null);
 
-        Map<String, Map<UUID, Boolean>> clientNodesMap = ctx.discovery().clientNodesMap();
+                req.startCacheConfiguration(desc.cacheConfiguration());
 
-        if (reconnect) {
-            clientNodesMap = U.newHashMap(caches.size());
+                req.cacheType(desc.cacheType());
 
-            for (GridCacheAdapter<?, ?> cache : caches.values()) {
-                Boolean nearEnabled = cache.isNear();
+                req.deploymentId(desc.deploymentId());
 
-                Map<UUID, Boolean> map = U.newHashMap(1);
+                reqs.add(req);
+            }
 
-                map.put(nodeId, nearEnabled);
+            for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
+                DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null);
 
-                clientNodesMap.put(cache.name(), map);
+                req.startCacheConfiguration(desc.cacheConfiguration());
+
+                req.template(true);
+
+                req.deploymentId(desc.deploymentId());
+
+                reqs.add(req);
             }
+
+            clientNodesMap = ctx.discovery().clientNodesMap();
         }
 
-        req.clientNodes(clientNodesMap);
+        DynamicCacheChangeBatch batch = new DynamicCacheChangeBatch(reqs);
+
+        batch.clientNodes(clientNodesMap);
 
-        req.clientReconnect(reconnect);
+        batch.clientReconnect(reconnect);
 
-        return req;
+        return batch;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 83867f4..356a85b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -192,9 +192,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
         ClusterNode loc = cctx.localNode();
 
-        long startTime = loc.metrics().getStartTime();
-
-        assert startTime > 0;
+        assert loc.metrics().getStartTime() > 0;
 
         final long startTopVer = loc.order();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java
index 2de9d84..f0319aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java
@@ -39,6 +39,8 @@ import javax.cache.processor.EntryProcessor;
 import javax.cache.processor.MutableEntry;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.portable.api.IgnitePortables;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cluster.ClusterNode;
@@ -75,6 +77,7 @@ import org.apache.ignite.internal.util.lang.GridMapEntry;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -371,6 +374,12 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor
                     else
                         throw e;
                 }
+                catch (CacheException e) {
+                    if (X.hasCause(e, ClusterTopologyCheckedException.class, ClusterTopologyException.class))
+                        continue;
+                    else
+                        throw e;
+                }
 
                 break;
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
index 88837de..5647239 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
@@ -193,14 +193,19 @@ public class GridNioRecoveryDescriptor {
 
     /**
      * Node left callback.
+     *
+     * @return {@code False} if descriptor is reserved.
      */
-    public void onNodeLeft() {
+    public boolean onNodeLeft() {
         GridNioFuture<?>[] futs = null;
 
         synchronized (this) {
             nodeLeft = true;
 
-            if (!reserved && !msgFuts.isEmpty()) {
+            if (reserved)
+                return false;
+
+            if (!msgFuts.isEmpty()) {
                 futs = msgFuts.toArray(new GridNioFuture<?>[msgFuts.size()]);
 
                 msgFuts.clear();
@@ -209,6 +214,8 @@ public class GridNioRecoveryDescriptor {
 
         if (futs != null)
             completeOnNodeLeft(futs);
+
+        return true;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 5ea2c02..e8bd8a1 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -61,6 +61,7 @@ import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.util.GridConcurrentFactory;
 import org.apache.ignite.internal.util.GridSpinReadWriteLock;
@@ -1358,7 +1359,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
     /** {@inheritDoc} */
     @Override public int getOutboundMessagesQueueSize() {
-        return nioSrvr.outboundMessagesQueueSize();
+        GridNioServer<Message> srv = nioSrvr;
+
+        return srv != null ? srv.outboundMessagesQueueSize() : 0;
     }
 
     /** {@inheritDoc} */
@@ -1870,25 +1873,25 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      *
      * @param node Destination node.
      * @param msg Message to send.
-     * @param ackClosure Ack closure.
+     * @param ackC Ack closure.
      * @throws org.apache.ignite.spi.IgniteSpiException Thrown in case of any error during sending the message.
      *      Note that this is not guaranteed that failed communication will result
      *      in thrown exception as this is dependant on SPI implementation.
      */
-    public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
+    public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
         throws IgniteSpiException {
-        sendMessage0(node, msg, ackClosure);
+        sendMessage0(node, msg, ackC);
     }
 
     /**
      * @param node Destination node.
      * @param msg Message to send.
-     * @param ackClosure Ack closure.
+     * @param ackC Ack closure.
      * @throws org.apache.ignite.spi.IgniteSpiException Thrown in case of any error during sending the message.
      *      Note that this is not guaranteed that failed communication will result
      *      in thrown exception as this is dependant on SPI implementation.
      */
-    private void sendMessage0(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
+    private void sendMessage0(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
         throws IgniteSpiException {
         assert node != null;
         assert msg != null;
@@ -1896,13 +1899,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         if (log.isTraceEnabled())
             log.trace("Sending message with ack to node [node=" + node + ", msg=" + msg + ']');
 
-        ClusterNode localNode = getLocalNode();
+        ClusterNode locNode = getLocalNode();
 
-        if (localNode == null)
+        if (locNode == null)
             throw new IgniteSpiException("Local node has not been started or fully initialized " +
                 "[isStopping=" + getSpiContext().isStopping() + ']');
 
-        if (node.id().equals(localNode.id()))
+        if (node.id().equals(locNode.id()))
             notifyListener(node.id(), msg, NOOP);
         else {
             GridCommunicationClient client = null;
@@ -1915,10 +1918,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                     UUID nodeId = null;
 
-                    if (!client.async() && !localNode.version().equals(node.version()))
+                    if (!client.async() && !locNode.version().equals(node.version()))
                         nodeId = node.id();
 
-                    retry = client.sendMessage(nodeId, msg, ackClosure);
+                    retry = client.sendMessage(nodeId, msg, ackC);
 
                     client.release();
 
@@ -2292,6 +2295,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         return null;
                     }
 
+                    if (getSpiContext().node(node.id()) == null) {
+                        recoveryDesc.release();
+
+                        U.closeQuiet(ch);
+
+                        throw new ClusterTopologyCheckedException("Failed to send message, " +
+                            "node left cluster: " + node);
+                    }
+
                     long rcvCnt = -1;
 
                     SSLEngine sslEngine = null;
@@ -3100,10 +3112,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 assert !left.isEmpty();
 
                 for (ClientKey id : left) {
-                    GridNioRecoveryDescriptor recoverySnd = recoveryDescs.remove(id);
+                    GridNioRecoveryDescriptor recoverySnd = recoveryDescs.get(id);
 
-                    if (recoverySnd != null)
-                        recoverySnd.onNodeLeft();
+                    if (recoverySnd != null && recoverySnd.onNodeLeft())
+                        recoveryDescs.remove(id);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index e4c29db..a4619c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -270,8 +270,6 @@ class ClientImpl extends TcpDiscoveryImpl {
 
     /** {@inheritDoc} */
     @Override public void spiStop() throws IgniteSpiException {
-        timer.cancel();
-
         if (msgWorker != null && msgWorker.isAlive()) { // Should always be alive
             msgWorker.addMessage(SPI_STOP);
 
@@ -297,6 +295,8 @@ class ClientImpl extends TcpDiscoveryImpl {
         U.join(sockWriter, log);
         U.join(sockReader, log);
 
+        timer.cancel();
+
         spi.printStopInfo();
     }
 
@@ -461,7 +461,8 @@ class ClientImpl extends TcpDiscoveryImpl {
      * @see TcpDiscoverySpi#joinTimeout
      */
     @SuppressWarnings("BusyWait")
-    @Nullable private T2<Socket, Boolean> joinTopology(boolean recon, long timeout) throws IgniteSpiException, InterruptedException {
+    @Nullable private T2<SocketStream, Boolean> joinTopology(boolean recon, long timeout)
+        throws IgniteSpiException, InterruptedException {
         Collection<InetSocketAddress> addrs = null;
 
         long startTime = U.currentTimeMillis();
@@ -501,7 +502,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 InetSocketAddress addr = it.next();
 
-                T3<Socket, Integer, Boolean> sockAndRes = sendJoinRequest(recon, addr);
+                T3<SocketStream, Integer, Boolean> sockAndRes = sendJoinRequest(recon, addr);
 
                 if (sockAndRes == null) {
                     it.remove();
@@ -511,11 +512,11 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 assert sockAndRes.get1() != null && sockAndRes.get2() != null : sockAndRes;
 
-                Socket sock = sockAndRes.get1();
+                Socket sock = sockAndRes.get1().socket();
 
                 switch (sockAndRes.get2()) {
                     case RES_OK:
-                        return new T2<>(sock, sockAndRes.get3());
+                        return new T2<>(sockAndRes.get1(), sockAndRes.get3());
 
                     case RES_CONTINUE_JOIN:
                     case RES_WAIT:
@@ -548,7 +549,7 @@ class ClientImpl extends TcpDiscoveryImpl {
      * @param addr Address.
      * @return Socket, connect response and client acknowledge support flag.
      */
-    @Nullable private T3<Socket, Integer, Boolean> sendJoinRequest(boolean recon, InetSocketAddress addr) {
+    @Nullable private T3<SocketStream, Integer, Boolean> sendJoinRequest(boolean recon, InetSocketAddress addr) {
         assert addr != null;
 
         if (log.isDebugEnabled())
@@ -621,7 +622,8 @@ class ClientImpl extends TcpDiscoveryImpl {
                     log.debug("Message has been sent to address [msg=" + msg + ", addr=" + addr +
                         ", rmtNodeId=" + rmtNodeId + ']');
 
-                return new T3<>(sock, spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)),
+                return new T3<>(new SocketStream(sock),
+                    spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)),
                     res.clientAck());
             }
             catch (IOException | IgniteCheckedException e) {
@@ -708,7 +710,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
             Collection<ClusterNode> top = topHist.get(topVer);
 
-            assert top != null : msg;
+            assert top != null : "Failed to find topology history [msg=" + msg + ", hist=" + topHist + ']';
 
             return top;
         }
@@ -765,7 +767,10 @@ class ClientImpl extends TcpDiscoveryImpl {
 
     /** {@inheritDoc} */
     @Override public void brakeConnection() {
-        U.closeQuiet(msgWorker.currSock);
+        SocketStream sockStream = msgWorker.currSock;
+
+        if (sockStream != null)
+            U.closeQuiet(sockStream.socket());
     }
 
     /** {@inheritDoc} */
@@ -826,7 +831,7 @@ class ClientImpl extends TcpDiscoveryImpl {
         private final Object mux = new Object();
 
         /** */
-        private Socket sock;
+        private SocketStream sockStream;
 
         /** */
         private UUID rmtNodeId;
@@ -838,12 +843,12 @@ class ClientImpl extends TcpDiscoveryImpl {
         }
 
         /**
-         * @param sock Socket.
+         * @param sockStream Socket.
          * @param rmtNodeId Rmt node id.
          */
-        public void setSocket(Socket sock, UUID rmtNodeId) {
+        public void setSocket(SocketStream sockStream, UUID rmtNodeId) {
             synchronized (mux) {
-                this.sock = sock;
+                this.sockStream = sockStream;
 
                 this.rmtNodeId = rmtNodeId;
 
@@ -854,22 +859,24 @@ class ClientImpl extends TcpDiscoveryImpl {
         /** {@inheritDoc} */
         @Override protected void body() throws InterruptedException {
             while (!isInterrupted()) {
-                Socket sock;
+                SocketStream sockStream;
                 UUID rmtNodeId;
 
                 synchronized (mux) {
-                    if (this.sock == null) {
+                    if (this.sockStream == null) {
                         mux.wait();
 
                         continue;
                     }
 
-                    sock = this.sock;
+                    sockStream = this.sockStream;
                     rmtNodeId = this.rmtNodeId;
                 }
 
+                Socket sock = sockStream.socket();
+
                 try {
-                    InputStream in = new BufferedInputStream(sock.getInputStream());
+                    InputStream in = sockStream.stream();
 
                     sock.setKeepAlive(true);
                     sock.setTcpNoDelay(true);
@@ -912,18 +919,14 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                         boolean ack = msg instanceof TcpDiscoveryClientAckResponse;
 
-                        if (!ack) {
-                            if (spi.ensured(msg) && joinLatch.getCount() == 0L)
-                                lastMsgId = msg.id();
-
+                        if (!ack)
                             msgWorker.addMessage(msg);
-                        }
                         else
                             sockWriter.ackReceived((TcpDiscoveryClientAckResponse)msg);
                     }
                 }
                 catch (IOException e) {
-                    msgWorker.addMessage(new SocketClosedMessage(sock));
+                    msgWorker.addMessage(new SocketClosedMessage(sockStream));
 
                     if (log.isDebugEnabled())
                         U.error(log, "Connection failed [sock=" + sock + ", locNodeId=" + getLocalNodeId() + ']', e);
@@ -932,8 +935,8 @@ class ClientImpl extends TcpDiscoveryImpl {
                     U.closeQuiet(sock);
 
                     synchronized (mux) {
-                        if (this.sock == sock) {
-                            this.sock = null;
+                        if (this.sockStream == sockStream) {
+                            this.sockStream = null;
                             this.rmtNodeId = null;
                         }
                     }
@@ -1125,7 +1128,7 @@ class ClientImpl extends TcpDiscoveryImpl {
      */
     private class Reconnector extends IgniteSpiThread {
         /** */
-        private volatile Socket sock;
+        private volatile SocketStream sockStream;
 
         /** */
         private boolean clientAck;
@@ -1148,7 +1151,10 @@ class ClientImpl extends TcpDiscoveryImpl {
         public void cancel() {
             interrupt();
 
-            U.closeQuiet(sock);
+            SocketStream sockStream = this.sockStream;
+
+            if (sockStream != null)
+                U.closeQuiet(sockStream.socket());
         }
 
         /** {@inheritDoc} */
@@ -1166,24 +1172,26 @@ class ClientImpl extends TcpDiscoveryImpl {
 
             try {
                 while (true) {
-                    T2<Socket, Boolean> joinRes = joinTopology(true, timeout);
+                    T2<SocketStream, Boolean> joinRes = joinTopology(true, timeout);
 
                     if (joinRes == null) {
                         if (join) {
                             joinError(new IgniteSpiException("Join process timed out, connection failed and " +
                                 "failed to reconnect (consider increasing 'joinTimeout' configuration property) " +
-                                "[joinTimeout=" + spi.joinTimeout + ", sock=" + sock + ']'));
+                                "[joinTimeout=" + spi.joinTimeout + ']'));
                         }
                         else
                             U.error(log, "Failed to reconnect to cluster (consider increasing 'networkTimeout'" +
-                                " configuration  property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock + ']');
+                                " configuration  property) [networkTimeout=" + spi.netTimeout + ']');
 
                         return;
                     }
 
-                    sock = joinRes.get1();
+                    sockStream = joinRes.get1();
                     clientAck = joinRes.get2();
 
+                    Socket sock = sockStream.socket();
+
                     if (isInterrupted())
                         throw new InterruptedException();
 
@@ -1194,7 +1202,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                         sock.setSoTimeout((int)spi.netTimeout);
 
-                        InputStream in = new BufferedInputStream(sock.getInputStream());
+                        InputStream in = sockStream.stream();
 
                         sock.setKeepAlive(true);
                         sock.setTcpNoDelay(true);
@@ -1264,11 +1272,16 @@ class ClientImpl extends TcpDiscoveryImpl {
             catch (IOException | IgniteCheckedException e) {
                 err = e;
 
+                success = false;
+
                 U.error(log, "Failed to reconnect", e);
             }
             finally {
                 if (!success) {
-                    U.closeQuiet(sock);
+                    SocketStream sockStream = this.sockStream;
+
+                    if (sockStream != null)
+                        U.closeQuiet(sockStream.socket());
 
                     if (join)
                         joinError(new IgniteSpiException("Failed to connect to cluster, connection failed and failed " +
@@ -1288,10 +1301,7 @@ class ClientImpl extends TcpDiscoveryImpl {
         private final BlockingDeque<Object> queue = new LinkedBlockingDeque<>();
 
         /** */
-        private Socket currSock;
-
-        /** Indicates that pending messages are currently processed. */
-        private boolean pending;
+        private SocketStream currSock;
 
         /** */
         private Reconnector reconnector;
@@ -1338,11 +1348,13 @@ class ClientImpl extends TcpDiscoveryImpl {
                         }
                     }
                     else if (msg == SPI_STOP) {
+                        boolean connected = state == CONNECTED;
+
                         state = STOPPED;
 
                         assert spi.getSpiContext().isStopping();
 
-                        if (currSock != null) {
+                        if (connected && currSock != null) {
                             TcpDiscoveryAbstractMessage leftMsg = new TcpDiscoveryNodeLeftMessage(getLocalNodeId());
 
                             leftMsg.client(true);
@@ -1467,7 +1479,10 @@ class ClientImpl extends TcpDiscoveryImpl {
                 }
             }
             finally {
-                U.closeQuiet(currSock);
+                SocketStream currSock = this.currSock;
+
+                if (currSock != null)
+                    U.closeQuiet(currSock.socket());
 
                 if (joinLatch.getCount() > 0)
                     joinError(new IgniteSpiException("Some error in join process.")); // This should not occur.
@@ -1490,7 +1505,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
             joinCnt++;
 
-            T2<Socket, Boolean> joinRes = joinTopology(false, spi.joinTimeout);
+            T2<SocketStream, Boolean> joinRes = joinTopology(false, spi.joinTimeout);
 
             if (joinRes == null) {
                 if (join)
@@ -1506,7 +1521,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
             currSock = joinRes.get1();
 
-            sockWriter.setSocket(joinRes.get1(), joinRes.get2());
+            sockWriter.setSocket(joinRes.get1().socket(), joinRes.get2());
 
             if (spi.joinTimeout > 0) {
                 final int joinCnt0 = joinCnt;
@@ -1551,6 +1566,9 @@ class ClientImpl extends TcpDiscoveryImpl {
                 processPingRequest();
 
             spi.stats.onMessageProcessingFinished(msg);
+
+            if (spi.ensured(msg) && state == CONNECTED)
+                lastMsgId = msg.id();
         }
 
         /**
@@ -1604,8 +1622,10 @@ class ClientImpl extends TcpDiscoveryImpl {
                         if (msg.topologyHistory() != null)
                             topHist.putAll(msg.topologyHistory());
                     }
-                    else if (log.isDebugEnabled())
-                        log.debug("Discarding node added message with empty topology: " + msg);
+                    else {
+                        if (log.isDebugEnabled())
+                            log.debug("Discarding node added message with empty topology: " + msg);
+                    }
                 }
                 else if (log.isDebugEnabled())
                     log.debug("Discarding node added message (this message has already been processed) " +
@@ -1625,8 +1645,10 @@ class ClientImpl extends TcpDiscoveryImpl {
                             spi.onExchange(newNodeId, newNodeId, data, null);
                     }
                 }
-                else if (log.isDebugEnabled())
-                    log.debug("Ignore topology message, local node not added to topology: " + msg);
+                else {
+                    if (log.isDebugEnabled())
+                        log.debug("Ignore topology message, local node not added to topology: " + msg);
+                }
             }
         }
 
@@ -1653,6 +1675,11 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                     locNode.order(topVer);
 
+                    for (Iterator<Long> it = topHist.keySet().iterator(); it.hasNext();) {
+                        if (it.next() >= topVer)
+                            it.remove();
+                    }
+
                     Collection<ClusterNode> nodes = updateTopologyHistory(topVer, msg);
 
                     notifyDiscovery(EVT_NODE_JOINED, topVer, locNode, nodes);
@@ -1712,7 +1739,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                     assert top != null && top.contains(node) : "Topology does not contain node [msg=" + msg +
                         ", node=" + node + ", top=" + top + ']';
 
-                    if (!pending && joinLatch.getCount() > 0) {
+                    if (state != CONNECTED) {
                         if (log.isDebugEnabled())
                             log.debug("Discarding node add finished message (join process is not finished): " + msg);
 
@@ -1725,8 +1752,10 @@ class ClientImpl extends TcpDiscoveryImpl {
                         spi.stats.onNodeJoined();
                     }
                 }
-                else if (log.isDebugEnabled())
-                    log.debug("Ignore topology message, local node not added to topology: " + msg);
+                else {
+                    if (log.isDebugEnabled())
+                        log.debug("Ignore topology message, local node not added to topology: " + msg);
+                }
             }
         }
 
@@ -1756,7 +1785,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                     Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion(), msg);
 
-                    if (!pending && joinLatch.getCount() > 0) {
+                    if (state != CONNECTED) {
                         if (log.isDebugEnabled())
                             log.debug("Discarding node left message (join process is not finished): " + msg);
 
@@ -1767,8 +1796,10 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                     spi.stats.onNodeLeft();
                 }
-                else if (log.isDebugEnabled())
-                    log.debug("Ignore topology message, local node not added to topology: " + msg);
+                else {
+                    if (log.isDebugEnabled())
+                        log.debug("Ignore topology message, local node not added to topology: " + msg);
+                }
             }
         }
 
@@ -1809,7 +1840,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion(), msg);
 
-                if (!pending && joinLatch.getCount() > 0) {
+                if (state != CONNECTED) {
                     if (log.isDebugEnabled())
                         log.debug("Discarding node failed message (join process is not finished): " + msg);
 
@@ -1875,25 +1906,18 @@ class ClientImpl extends TcpDiscoveryImpl {
                 if (reconnector != null) {
                     assert msg.success() : msg;
 
-                    currSock = reconnector.sock;
+                    currSock = reconnector.sockStream;
 
-                    sockWriter.setSocket(currSock, reconnector.clientAck);
+                    sockWriter.setSocket(currSock.socket(), reconnector.clientAck);
                     sockReader.setSocket(currSock, locNode.clientRouterNodeId());
 
                     reconnector = null;
 
-                    pending = true;
-
-                    try {
-                        for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages()) {
-                            if (log.isDebugEnabled())
-                                log.debug("Process pending message on reconnect [msg=" + pendingMsg + ']');
+                    for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages()) {
+                        if (log.isDebugEnabled())
+                            log.debug("Process pending message on reconnect [msg=" + pendingMsg + ']');
 
-                            processDiscoveryMessage(pendingMsg);
-                        }
-                    }
-                    finally {
-                        pending = false;
+                        processDiscoveryMessage(pendingMsg);
                     }
                 }
                 else {
@@ -1921,7 +1945,7 @@ class ClientImpl extends TcpDiscoveryImpl {
          * @param msg Message.
          */
         private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
-            if (msg.verified() && state == CONNECTED) {
+            if (state == CONNECTED) {
                 DiscoverySpiListener lsnr = spi.lsnr;
 
                 if (lsnr != null) {
@@ -2048,13 +2072,56 @@ class ClientImpl extends TcpDiscoveryImpl {
      */
     private static class SocketClosedMessage {
         /** */
+        private final SocketStream sock;
+
+        /**
+         * @param sock Socket.
+         */
+        private SocketClosedMessage(SocketStream sock) {
+            this.sock = sock;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class SocketStream {
+        /** */
         private final Socket sock;
 
+        /** */
+        private final InputStream in;
+
         /**
          * @param sock Socket.
+         * @throws IOException If failed to create stream.
          */
-        private SocketClosedMessage(Socket sock) {
+        public SocketStream(Socket sock) throws IOException {
+            assert sock != null;
+
             this.sock = sock;
+
+            this.in = new BufferedInputStream(sock.getInputStream());
+        }
+
+        /**
+         * @return Socket.
+         */
+        Socket socket() {
+            return sock;
+
+        }
+
+        /**
+         * @return Socket input stream.
+         */
+        InputStream stream() {
+            return in;
+        }
+
+        /** {@inheritDoc} */
+        public String toString() {
+            return sock.toString();
         }
     }
 
@@ -2077,4 +2144,4 @@ class ClientImpl extends TcpDiscoveryImpl {
         /** */
         STOPPED
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index b8df846..ee9f818 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -126,7 +126,9 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessa
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_HISTORY_SIZE;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID;
+import static org.apache.ignite.IgniteSystemProperties.getInteger;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@@ -154,6 +156,9 @@ import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusChe
 @SuppressWarnings("All")
 class ServerImpl extends TcpDiscoveryImpl {
     /** */
+    private static final int ENSURED_MSG_HIST_SIZE = getInteger(IGNITE_DISCOVERY_HISTORY_SIZE, 1024 * 10);
+
+    /** */
     private final ThreadPoolExecutor utilityPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS,
         new LinkedBlockingQueue<Runnable>());
 
@@ -1250,9 +1255,11 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             lsnr.onDiscovery(type, topVer, node, top, hist, null);
         }
-        else if (log.isDebugEnabled())
-            log.debug("Skipped discovery notification [node=" + node + ", spiState=" + spiState +
-                ", type=" + U.gridEventName(type) + ", topVer=" + topVer + ']');
+        else {
+            if (log.isDebugEnabled())
+                log.debug("Skipped discovery notification [node=" + node + ", spiState=" + spiState +
+                    ", type=" + U.gridEventName(type) + ", topVer=" + topVer + ']');
+        }
     }
 
     /**
@@ -1447,6 +1454,12 @@ class ServerImpl extends TcpDiscoveryImpl {
             tmp = U.arrayList(readers);
         }
 
+        for (ClientMessageWorker msgWorker : clientMsgWorkers.values()) {
+            U.interrupt(msgWorker);
+
+            U.join(msgWorker, log);
+        }
+
         U.interrupt(tmp);
         U.joinThreads(tmp, log);
 
@@ -1744,22 +1757,36 @@ class ServerImpl extends TcpDiscoveryImpl {
      * Discovery messages history used for client reconnect.
      */
     private class EnsuredMessageHistory {
-        /** */
-        private static final int MAX = 1024;
-
         /** Pending messages. */
-        private final ArrayDeque<TcpDiscoveryAbstractMessage> msgs = new ArrayDeque<>(MAX * 2);
+        private final GridBoundedLinkedHashSet<TcpDiscoveryAbstractMessage>
+            msgs = new GridBoundedLinkedHashSet<>(ENSURED_MSG_HIST_SIZE);
 
         /**
          * @param msg Adds message.
          */
         void add(TcpDiscoveryAbstractMessage msg) {
-            assert spi.ensured(msg) : msg;
+            assert spi.ensured(msg) && msg.verified() : msg;
+
+            if (msg instanceof TcpDiscoveryNodeAddedMessage) {
+                TcpDiscoveryNodeAddedMessage addedMsg = (TcpDiscoveryNodeAddedMessage)msg;
 
-            msgs.addLast(msg);
+                TcpDiscoveryNode node = addedMsg.node();
 
-            while (msgs.size() > MAX)
-                msgs.pollFirst();
+                if (node.isClient() && !msgs.contains(msg)) {
+                    Collection<TcpDiscoveryNode> allNodes = ring.allNodes();
+
+                    Collection<TcpDiscoveryNode> top = new ArrayList<>(allNodes.size());
+
+                    for (TcpDiscoveryNode n0 : allNodes) {
+                        if (n0.internalOrder() != 0 && n0.internalOrder() < node.internalOrder())
+                            top.add(n0);
+                    }
+
+                    addedMsg.clientTopology(top);
+                }
+            }
+
+            msgs.add(msg);
         }
 
         /**
@@ -1782,11 +1809,11 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 for (TcpDiscoveryAbstractMessage msg : msgs) {
                     if (msg instanceof TcpDiscoveryNodeAddedMessage) {
-                        if (node.id().equals(((TcpDiscoveryNodeAddedMessage) msg).node().id()))
+                        if (node.id().equals(((TcpDiscoveryNodeAddedMessage)msg).node().id()))
                             res = new ArrayList<>(msgs.size());
                     }
 
-                    if (res != null && msg.verified())
+                    if (res != null)
                         res.add(prepare(msg, node.id()));
                 }
 
@@ -1812,7 +1839,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                         if (msg.id().equals(lastMsgId))
                             skip = false;
                     }
-                    else if (msg.verified())
+                    else
                         cp.add(prepare(msg, node.id()));
                 }
 
@@ -1820,7 +1847,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 if (log.isDebugEnabled()) {
                     if (cp == null)
-                        log.debug("Failed to find messages history [node=" + node + ", lastMsgId" + lastMsgId + ']');
+                        log.debug("Failed to find messages history [node=" + node + ", lastMsgId=" + lastMsgId + ']');
                     else
                         log.debug("Found messages history [node=" + node + ", hist=" + cp + ']');
                 }
@@ -1835,8 +1862,21 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @return Prepared message.
          */
         private TcpDiscoveryAbstractMessage prepare(TcpDiscoveryAbstractMessage msg, UUID destNodeId) {
-            if (msg instanceof TcpDiscoveryNodeAddedMessage)
-                prepareNodeAddedMessage(msg, destNodeId, null, null, null);
+            if (msg instanceof TcpDiscoveryNodeAddedMessage) {
+                TcpDiscoveryNodeAddedMessage addedMsg = (TcpDiscoveryNodeAddedMessage)msg;
+
+                if (addedMsg.node().id().equals(destNodeId)) {
+                    assert addedMsg.clientTopology() != null : addedMsg;
+
+                    TcpDiscoveryNodeAddedMessage msg0 = new TcpDiscoveryNodeAddedMessage(addedMsg);
+
+                    prepareNodeAddedMessage(msg0, destNodeId, null, null, null);
+
+                    msg0.topology(addedMsg.clientTopology());
+
+                    return msg0;
+                }
+            }
 
             return msg;
         }
@@ -2132,7 +2172,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             else
                 assert false : "Unknown message type: " + msg.getClass().getSimpleName();
 
-            if (spi.ensured(msg))
+            if (spi.ensured(msg) && redirectToClients(msg))
                 msgHist.add(msg);
 
             if (msg.senderNodeId() != null && !msg.senderNodeId().equals(getLocalNodeId())) {
@@ -2161,19 +2201,9 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
-         * Sends message across the ring.
-         *
-         * @param msg Message to send
+         * @param msg Message.
          */
-        @SuppressWarnings({"BreakStatementWithLabel", "LabeledStatement", "ContinueStatementWithLabel"})
-        private void sendMessageAcrossRing(TcpDiscoveryAbstractMessage msg) {
-            assert msg != null;
-
-            assert ring.hasRemoteNodes();
-
-            for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : spi.sendMsgLsnrs)
-                msgLsnr.apply(msg);
-
+        private void sendMessageToClients(TcpDiscoveryAbstractMessage msg) {
             if (redirectToClients(msg)) {
                 byte[] marshalledMsg = null;
 
@@ -2193,9 +2223,28 @@ class ServerImpl extends TcpDiscoveryImpl {
                         msgClone = msg;
                     }
 
+                    prepareNodeAddedMessage(msgClone, clientMsgWorker.clientNodeId, null, null, null);
+
                     clientMsgWorker.addMessage(msgClone);
                 }
             }
+        }
+
+        /**
+         * Sends message across the ring.
+         *
+         * @param msg Message to send
+         */
+        @SuppressWarnings({"BreakStatementWithLabel", "LabeledStatement", "ContinueStatementWithLabel"})
+        private void sendMessageAcrossRing(TcpDiscoveryAbstractMessage msg) {
+            assert msg != null;
+
+            assert ring.hasRemoteNodes();
+
+            for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : spi.sendMsgLsnrs)
+                msgLsnr.apply(msg);
+
+            sendMessageToClients(msg);
 
             Collection<TcpDiscoveryNode> failedNodes;
 
@@ -2810,7 +2859,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     "[clientNode=" + existingNode + ", msg=" + reconMsg + ']');
                         }
                         else {
-                            if (ring.hasRemoteNodes())
+                            if (sendMessageToRemotes(reconMsg))
                                 sendMessageAcrossRing(reconMsg);
                         }
                     }
@@ -3052,8 +3101,11 @@ class ServerImpl extends TcpDiscoveryImpl {
                 nodeAddedMsg.client(msg.client());
 
                 processNodeAddedMessage(nodeAddedMsg);
+
+                if (nodeAddedMsg.verified())
+                    msgHist.add(nodeAddedMsg);
             }
-            else if (ring.hasRemoteNodes())
+            else if (sendMessageToRemotes(msg))
                 sendMessageAcrossRing(msg);
         }
 
@@ -3155,8 +3207,13 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 log.debug("Failing reconnecting client node because failed to restore pending " +
                                     "messages [locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']');
 
-                            processNodeFailedMessage(new TcpDiscoveryNodeFailedMessage(locNodeId,
-                                node.id(), node.internalOrder()));
+                            TcpDiscoveryNodeFailedMessage nodeFailedMsg = new TcpDiscoveryNodeFailedMessage(locNodeId,
+                                node.id(), node.internalOrder());
+
+                            processNodeFailedMessage(nodeFailedMsg);
+
+                            if (nodeFailedMsg.verified())
+                                msgHist.add(nodeFailedMsg);
                         }
                     }
                     else if (log.isDebugEnabled())
@@ -3172,12 +3229,12 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 locNodeId + ", clientNodeId=" + nodeId + ']');
                     }
                     else {
-                        if (ring.hasRemoteNodes())
+                        if (sendMessageToRemotes(msg))
                             sendMessageAcrossRing(msg);
                     }
                 }
                 else {
-                    if (ring.hasRemoteNodes())
+                    if (sendMessageToRemotes(msg))
                         sendMessageAcrossRing(msg);
                 }
             }
@@ -3239,6 +3296,9 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     processNodeAddFinishedMessage(addFinishMsg);
 
+                    if (addFinishMsg.verified())
+                        msgHist.add(addFinishMsg);
+
                     addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
 
                     return;
@@ -3249,7 +3309,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             else if (!locNodeId.equals(node.id()) && ring.node(node.id()) != null) {
                 // Local node already has node from message in local topology.
                 // Just pass it to coordinator via the ring.
-                if (ring.hasRemoteNodes())
+                if (sendMessageToRemotes(msg))
                     sendMessageAcrossRing(msg);
 
                 if (log.isDebugEnabled())
@@ -3437,7 +3497,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
             }
 
-            if (ring.hasRemoteNodes())
+            if (sendMessageToRemotes(msg))
                 sendMessageAcrossRing(msg);
         }
 
@@ -3572,7 +3632,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 notifyDiscovery(EVT_NODE_JOINED, topVer, locNode);
             }
 
-            if (ring.hasRemoteNodes())
+            if (sendMessageToRemotes(msg))
                 sendMessageAcrossRing(msg);
 
             checkPendingCustomMessages();
@@ -3740,7 +3800,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
             }
 
-            if (ring.hasRemoteNodes()) {
+            if (sendMessageToRemotes(msg)) {
                 try {
                     sendMessageAcrossRing(msg);
                 }
@@ -3761,6 +3821,19 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
+         * @param msg Message to send.
+         * @return {@code True} if message should be send across the ring.
+         */
+        private boolean sendMessageToRemotes(TcpDiscoveryAbstractMessage msg) {
+            if (ring.hasRemoteNodes())
+                return true;
+
+            sendMessageToClients(msg);
+
+            return false;
+        }
+
+        /**
          * Processes node failed message.
          *
          * @param msg Node failed message.
@@ -3892,7 +3965,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 spi.stats.onNodeFailed();
             }
 
-            if (ring.hasRemoteNodes())
+            if (sendMessageToRemotes(msg))
                 sendMessageAcrossRing(msg);
             else {
                 if (log.isDebugEnabled())
@@ -4032,7 +4105,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
             }
 
-            if (ring.hasRemoteNodes())
+            if (sendMessageToRemotes(msg))
                 sendMessageAcrossRing(msg);
         }
 
@@ -4098,7 +4171,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
             }
 
-            if (ring.hasRemoteNodes()) {
+            if (sendMessageToRemotes(msg)) {
                 if ((locNodeId.equals(msg.creatorNodeId()) && msg.senderNodeId() == null ||
                     !hasMetrics(msg, locNodeId)) && spiStateCopy() == CONNECTED) {
                     // Message is on its first ring or just created on coordinator.
@@ -4135,16 +4208,22 @@ class ServerImpl extends TcpDiscoveryImpl {
                                         failedNode = failedNodes.contains(clientNode);
                                     }
 
-                                    if (!failedNode)
-                                        processNodeFailedMessage(new TcpDiscoveryNodeFailedMessage(locNodeId,
-                                            clientNode.id(), clientNode.internalOrder()));
+                                    if (!failedNode) {
+                                        TcpDiscoveryNodeFailedMessage nodeFailedMsg = new TcpDiscoveryNodeFailedMessage(
+                                            locNodeId, clientNode.id(), clientNode.internalOrder());
+
+                                        processNodeFailedMessage(nodeFailedMsg);
+
+                                        if (nodeFailedMsg.verified())
+                                            msgHist.add(nodeFailedMsg);
+                                    }
                                 }
                             }
                         }
                     }
                 }
 
-                if (ring.hasRemoteNodes())
+                if (sendMessageToRemotes(msg))
                     sendMessageAcrossRing(msg);
             }
             else {
@@ -4351,7 +4430,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     notifyDiscoveryListener(msg);
                 }
 
-                if (ring.hasRemoteNodes())
+                if (sendMessageToRemotes(msg))
                     sendMessageAcrossRing(msg);
             }
         }
@@ -4363,8 +4442,12 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (joiningNodes.isEmpty() && isLocalNodeCoordinator()) {
                 TcpDiscoveryCustomEventMessage msg;
 
-                while ((msg = pendingCustomMsgs.poll()) != null)
+                while ((msg = pendingCustomMsgs.poll()) != null) {
                     processCustomMessage(msg);
+
+                    if (msg.verified())
+                        msgHist.add(msg);
+                }
             }
         }
 
@@ -5293,19 +5376,14 @@ class ServerImpl extends TcpDiscoveryImpl {
                     }
                 }
                 else {
-                    try {
-                        if (log.isDebugEnabled())
-                            log.debug("Redirecting message to client [sock=" + sock + ", locNodeId="
-                                + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
+                    if (log.isDebugEnabled())
+                        log.debug("Redirecting message to client [sock=" + sock + ", locNodeId="
+                            + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
 
-                        prepareNodeAddedMessage(msg, clientNodeId, null, null, null);
+                    assert topologyInitialized(msg) : msg;
 
-                        writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ?
-                            spi.failureDetectionTimeout() : spi.getSocketTimeout());
-                    }
-                    finally {
-                        clearNodeAddedMessage(msg);
-                    }
+                    writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ?
+                        spi.failureDetectionTimeout() : spi.getSocketTimeout());
                 }
             }
             catch (IgniteCheckedException | IOException e) {
@@ -5325,6 +5403,21 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
+         * @param msg Message.
+         * @return {@code True} if topology initialized.
+         */
+        private boolean topologyInitialized(TcpDiscoveryAbstractMessage msg) {
+            if (msg instanceof TcpDiscoveryNodeAddedMessage) {
+                TcpDiscoveryNodeAddedMessage addedMsg = (TcpDiscoveryNodeAddedMessage)msg;
+
+                if (clientNodeId.equals(addedMsg.node().id()))
+                    return addedMsg.topology() != null;
+            }
+
+            return true;
+        }
+
+        /**
          * @param res Ping result.
          */
         public void pingResult(boolean res) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
index c50f791..875d18e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
@@ -79,6 +79,17 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable {
     }
 
     /**
+     * @param msg Message.
+     */
+    protected TcpDiscoveryAbstractMessage(TcpDiscoveryAbstractMessage msg) {
+        this.id = msg.id;
+        this.verifierNodeId = msg.verifierNodeId;
+        this.topVer = msg.topVer;
+        this.flags = msg.flags;
+        this.pendingIdx = msg.pendingIdx;
+    }
+
+    /**
      * Gets creator node.
      *
      * @return Creator node ID.

http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
index 5a7146d..6f8e14e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
@@ -55,6 +55,10 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
     @GridToStringInclude
     private Collection<TcpDiscoveryNode> top;
 
+    /** */
+    @GridToStringInclude
+    private transient Collection<TcpDiscoveryNode> clientTop;
+
     /** Topology snapshots history. */
     private Map<Long, Collection<ClusterNode>> topHist;
 
@@ -93,6 +97,24 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
     }
 
     /**
+     * @param msg Message.
+     */
+    public TcpDiscoveryNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) {
+        super(msg);
+
+        this.node = msg.node;
+        this.msgs = msg.msgs;
+        this.discardMsgId = msg.discardMsgId;
+        this.discardCustomMsgId = msg.discardCustomMsgId;
+        this.top = msg.top;
+        this.clientTop = msg.clientTop;
+        this.topHist = msg.topHist;
+        this.newNodeDiscoData = msg.newNodeDiscoData;
+        this.oldNodesDiscoData = msg.oldNodesDiscoData;
+        this.gridStartTime = msg.gridStartTime;
+    }
+
+    /**
      * Gets newly added node.
      *
      * @return New node.
@@ -133,6 +155,7 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
      *
      * @param msgs Pending messages to send to new node.
      * @param discardMsgId Discarded message ID.
+     * @param discardCustomMsgId Discarded custom message ID.
      */
     public void messages(
         @Nullable Collection<TcpDiscoveryAbstractMessage> msgs,
@@ -163,6 +186,22 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
     }
 
     /**
+     * @param top Topology at the moment when client joined.
+     */
+    public void clientTopology(Collection<TcpDiscoveryNode> top) {
+        assert top != null && !top.isEmpty() : top;
+
+        this.clientTop = top;
+    }
+
+    /**
+     * @return Topology at the moment when client joined.
+     */
+    public Collection<TcpDiscoveryNode> clientTopology() {
+        return clientTop;
+    }
+
+    /**
      * Gets topology snapshots history.
      *
      * @return Map with topology snapshots history.

http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
index edd95e9..6131f54 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@ -1128,6 +1128,39 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectDestroyCache() throws Exception {
+        clientMode = true;
+
+        Ignite client = startGrid(SRV_CNT);
+
+        CacheConfiguration<Integer, Integer> ccfg1 = new CacheConfiguration<>();
+        ccfg1.setName("cache1");
+
+        CacheConfiguration<Integer, Integer> ccfg2 = new CacheConfiguration<>();
+        ccfg2.setName("cache2");
+
+        final Ignite srv = grid(0);
+
+        srv.createCache(ccfg1);
+        srv.createCache(ccfg2).put(1, 1);
+
+        IgniteCache<Integer, Integer> cache = client.cache("cache2");
+
+        reconnectClientNode(client, srv, new Runnable() {
+            @Override public void run() {
+                srv.destroyCache("cache1");
+            }
+        });
+
+        cache.put(2, 2);
+
+        assertEquals(1, (Object)cache.get(1));
+        assertEquals(2, (Object)cache.get(2));
+    }
+
+    /**
      * @param client Client.
      * @param disconnectLatch Disconnect event latch.
      * @param reconnectLatch Reconnect event latch.

http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index a6b5535..530ff61 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -78,6 +78,7 @@ import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.swapspace.inmemory.GridTestSwapSpaceSpi;
 import org.apache.ignite.testframework.GridTestUtils;
@@ -183,6 +184,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
         ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true);
 
         if (memoryMode() == OFFHEAP_TIERED || memoryMode() == OFFHEAP_VALUES)

http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java
index 51e76f6..659520b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java
@@ -32,6 +32,9 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.jetbrains.annotations.Nullable;
 
@@ -41,6 +44,9 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC
  * Test for reproducing problems during simultaneously Ignite instances stopping and cache requests executing.
  */
 public class CacheGetFutureHangsSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
     /** Grid count. */
     private static final int GRID_CNT = 8;
 
@@ -55,6 +61,8 @@ public class CacheGetFutureHangsSelfTest extends GridCommonAbstractTest {
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
         ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
 
         OptimizedMarshaller marsh = new OptimizedMarshaller();

http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientReconnectTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientReconnectTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientReconnectTest.java
index 2aa4280..37c5a6b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientReconnectTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientReconnectTest.java
@@ -94,6 +94,8 @@ public class IgniteCacheClientReconnectTest extends GridCommonAbstractTest {
     /** {@inheritDoc} */
     @Override protected void afterTestsStopped() throws Exception {
         super.afterTestsStopped();
+
+        stopAllGrids();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
index 78fc590..242b12d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
@@ -113,13 +113,6 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testManyClients() throws Throwable {
-        manyClientsPutGet();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
     public void testManyClientsClientDiscovery() throws Throwable {
         clientDiscovery = true;
 
@@ -138,6 +131,13 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testManyClientsForceServerMode() throws Throwable {
+        manyClientsPutGet();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     private void manyClientsSequentially() throws Exception {
         client = true;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientReconnectMassiveShutdownTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientReconnectMassiveShutdownTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientReconnectMassiveShutdownTest.java
new file mode 100644
index 0000000..6f0e887
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientReconnectMassiveShutdownTest.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.util.HashMap;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteClientDisconnectedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * Client reconnect test in multi threaded mode while cache operations are in progress.
+ */
+public class IgniteClientReconnectMassiveShutdownTest extends GridCommonAbstractTest {
+    /** */
+    private static final int GRID_CNT = 14;
+
+    /** */
+    private static final int CLIENT_GRID_CNT = 14;
+
+    /** */
+    private static volatile boolean clientMode;
+
+    /** */
+    private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setClientMode(clientMode);
+
+        cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(ipFinder));
+
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+
+        Thread.sleep(5000);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 5 * 60 * 1000;
+    }
+
+    /**
+     * @throws Exception If any error occurs.
+     */
+    public void _testMassiveServersShutdown1() throws Exception {
+        massiveServersShutdown(StopType.FAIL_EVENT);
+    }
+
+    /**
+     * @throws Exception If any error occurs.
+     */
+    public void testMassiveServersShutdown2() throws Exception {
+        massiveServersShutdown(StopType.SIMULATE_FAIL);
+    }
+
+    /**
+     * @throws Exception If any error occurs.
+     */
+    public void _testMassiveServersShutdown3() throws Exception {
+        massiveServersShutdown(StopType.CLOSE);
+    }
+
+    /**
+     * @param stopType How tp stop node.
+     * @throws Exception If any error occurs.
+     */
+    private void massiveServersShutdown(final StopType stopType) throws Exception {
+        clientMode = false;
+
+        startGridsMultiThreaded(GRID_CNT);
+
+        clientMode = true;
+
+        startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT);
+
+        final AtomicBoolean done = new AtomicBoolean();
+
+        // Starting a cache dynamically.
+        Ignite client = grid(GRID_CNT);
+
+        assertTrue(client.configuration().isClientMode());
+
+        CacheConfiguration<String, Integer> cfg = new CacheConfiguration<>();
+
+        cfg.setCacheMode(PARTITIONED);
+        cfg.setAtomicityMode(TRANSACTIONAL);
+        cfg.setBackups(2);
+        cfg.setOffHeapMaxMemory(0);
+        cfg.setMemoryMode(OFFHEAP_TIERED);
+
+        IgniteCache<String, Integer> cache = client.getOrCreateCache(cfg);
+
+        HashMap<String, Integer> put = new HashMap<>();
+
+        // Load some data.
+        for (int i = 0; i < 10_000; i++)
+            put.put(String.valueOf(i), i);
+
+        cache.putAll(put);
+
+        // Preparing client nodes and starting cache operations from them.
+        final BlockingQueue<Integer> clientIdx = new LinkedBlockingQueue<>();
+
+        for (int i = GRID_CNT; i < GRID_CNT + CLIENT_GRID_CNT; i++)
+            clientIdx.add(i);
+
+        IgniteInternalFuture<?> clientsFut = multithreadedAsync(
+            new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    int idx = clientIdx.take();
+
+                    Ignite ignite = grid(idx);
+
+                    Thread.currentThread().setName("client-thread-" + ignite.name());
+
+                    assertTrue(ignite.configuration().isClientMode());
+
+                    IgniteCache<String, Integer> cache = ignite.cache(null);
+
+                    IgniteTransactions txs = ignite.transactions();
+
+                    Random rand = new Random();
+
+                    while (!done.get()) {
+                        try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                            cache.put(String.valueOf(rand.nextInt(10_000)), rand.nextInt(50_000));
+
+                            tx.commit();
+                        }
+                        catch (ClusterTopologyException ex) {
+                            ex.retryReadyFuture().get();
+                        }
+                        catch (IgniteException | CacheException e) {
+                            if (X.hasCause(e, IgniteClientDisconnectedException.class)) {
+                                IgniteClientDisconnectedException cause = X.cause(e,
+                                    IgniteClientDisconnectedException.class);
+
+                                assert cause != null;
+
+                                cause.reconnectFuture().get();
+                            }
+                            else if (X.hasCause(e, ClusterTopologyException.class)) {
+                                ClusterTopologyException cause = X.cause(e, ClusterTopologyException.class);
+
+                                assert cause != null;
+
+                                cause.retryReadyFuture().get();
+                            }
+                            else
+                                throw e;
+                        }
+                    }
+
+                    return null;
+                }
+            },
+            CLIENT_GRID_CNT, "client-thread");
+
+        try {
+            // Killing a half of server nodes.
+            final int srvsToKill = GRID_CNT / 2;
+
+            final BlockingQueue<Integer> victims = new LinkedBlockingQueue<>();
+
+            for (int i = 0; i < srvsToKill; i++)
+                victims.add(i);
+
+            final BlockingQueue<Integer> assassins = new LinkedBlockingQueue<>();
+
+            for (int i = srvsToKill; i < GRID_CNT; i++)
+                assassins.add(i);
+
+            IgniteInternalFuture<?> srvsShutdownFut = multithreadedAsync(
+                new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        Thread.sleep(5_000);
+
+                        Ignite assassin = grid(assassins.take());
+
+                        assertFalse(assassin.configuration().isClientMode());
+
+                        Ignite victim = grid(victims.take());
+
+                        assertFalse(victim.configuration().isClientMode());
+
+                        log.info("Kill node [node=" + victim.name() + ", from=" + assassin.name() + ']');
+
+                        switch (stopType) {
+                            case CLOSE:
+                                victim.close();
+
+                                break;
+
+                            case FAIL_EVENT:
+                                UUID nodeId = victim.cluster().localNode().id();
+
+                                assassin.configuration().getDiscoverySpi().failNode(nodeId, null);
+
+                                break;
+
+                            case SIMULATE_FAIL:
+                                ((TcpDiscoverySpi)victim.configuration().getDiscoverySpi()).simulateNodeFailure();
+
+                                break;
+
+                            default:
+                                fail();
+                        }
+
+                        return null;
+                    }
+                },
+                assassins.size(), "kill-thread");
+
+            srvsShutdownFut.get();
+
+            Thread.sleep(15_000);
+
+            done.set(true);
+
+            clientsFut.get();
+
+            awaitPartitionMapExchange();
+
+            for (int k = 0; k < 10_000; k++) {
+                String key = String.valueOf(k);
+
+                Object val = cache.get(key);
+
+                for (int i = srvsToKill; i < GRID_CNT; i++)
+                    assertEquals(val, ignite(i).cache(null).get(key));
+            }
+        }
+        finally {
+            done.set(true);
+        }
+    }
+
+    /**
+     *
+     */
+    enum StopType {
+        /** */
+        CLOSE,
+
+        /** */
+        SIMULATE_FAIL,
+
+        /** */
+        FAIL_EVENT
+    }
+}


Mime
View raw message