ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject [21/50] [abbrv] ignite git commit: # ignite-901 client reconnect support
Date Mon, 07 Sep 2015 20:46:48 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 4ca2995..2bce637 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -228,6 +228,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** */
     public static final byte HANDSHAKE_MSG_TYPE = -3;
 
+    /** */
+    private ConnectGateway connectGate;
+
     /** Server listener. */
     private final GridNioServerListener<Message> srvLsnr =
         new GridNioServerListenerAdapter<Message>() {
@@ -248,7 +251,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     if (log.isDebugEnabled())
                         log.debug("Sending local node ID to newly accepted session: " + ses);
 
-                    ses.send(nodeIdMsg);
+                    ses.send(nodeIdMessage());
                 }
             }
 
@@ -289,136 +292,163 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 }
             }
 
-            @Override public void onMessage(GridNioSession ses, Message msg) {
-                UUID sndId = ses.meta(NODE_ID_META);
+            /**
+             * @param ses Session.
+             * @param msg Message.
+             */
+            private void onFirstMessage(GridNioSession ses, Message msg) {
+                UUID sndId;
 
-                if (sndId == null) {
-                    assert ses.accepted();
+                if (msg instanceof NodeIdMessage)
+                    sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes, 0);
+                else {
+                    assert msg instanceof HandshakeMessage : msg;
 
-                    if (msg instanceof NodeIdMessage)
-                        sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes, 0);
-                    else {
-                        assert msg instanceof HandshakeMessage : msg;
+                    sndId = ((HandshakeMessage)msg).nodeId();
+                }
 
-                        sndId = ((HandshakeMessage)msg).nodeId();
-                    }
+                if (log.isDebugEnabled())
+                    log.debug("Remote node ID received: " + sndId);
 
-                    if (log.isDebugEnabled())
-                        log.debug("Remote node ID received: " + sndId);
+                final UUID old = ses.addMeta(NODE_ID_META, sndId);
 
-                    final UUID old = ses.addMeta(NODE_ID_META, sndId);
+                assert old == null;
 
-                    assert old == null;
+                final ClusterNode rmtNode = getSpiContext().node(sndId);
 
-                    final ClusterNode rmtNode = getSpiContext().node(sndId);
+                if (rmtNode == null) {
+                    if (log.isDebugEnabled())
+                        log.debug("Close incoming connection, unknown node: " + sndId);
 
-                    if (rmtNode == null) {
-                        ses.close();
+                    ses.close();
 
-                        return;
-                    }
+                    return;
+                }
 
-                    ClusterNode locNode = getSpiContext().localNode();
+                ClusterNode locNode = getSpiContext().localNode();
 
-                    if (ses.remoteAddress() == null)
-                        return;
+                if (ses.remoteAddress() == null)
+                    return;
 
-                    GridCommunicationClient oldClient = clients.get(sndId);
+                GridCommunicationClient oldClient = clients.get(sndId);
 
-                    boolean hasShmemClient = false;
+                boolean hasShmemClient = false;
 
-                    if (oldClient != null) {
-                        if (oldClient instanceof GridTcpNioCommunicationClient) {
-                            if (log.isDebugEnabled())
-                                log.debug("Received incoming connection when already connected " +
+                if (oldClient != null) {
+                    if (oldClient instanceof GridTcpNioCommunicationClient) {
+                        if (log.isDebugEnabled())
+                            log.debug("Received incoming connection when already connected " +
                                     "to this node, rejecting [locNode=" + locNode.id() +
                                     ", rmtNode=" + sndId + ']');
 
-                            ses.send(new RecoveryLastReceivedMessage(-1));
+                        ses.send(new RecoveryLastReceivedMessage(-1));
 
-                            return;
-                        }
-                        else {
-                            assert oldClient instanceof GridShmemCommunicationClient;
+                        return;
+                    }
+                    else {
+                        assert oldClient instanceof GridShmemCommunicationClient;
 
-                            hasShmemClient = true;
-                        }
+                        hasShmemClient = true;
                     }
+                }
 
-                    GridFutureAdapter<GridCommunicationClient> fut = new GridFutureAdapter<>();
+                GridFutureAdapter<GridCommunicationClient> fut = new GridFutureAdapter<>();
 
-                    GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(sndId, fut);
+                GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(sndId, fut);
 
-                    assert msg instanceof HandshakeMessage : msg;
+                assert msg instanceof HandshakeMessage : msg;
 
-                    HandshakeMessage msg0 = (HandshakeMessage)msg;
+                HandshakeMessage msg0 = (HandshakeMessage)msg;
 
-                    final GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(rmtNode);
+                final GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(rmtNode);
 
-                    if (oldFut == null) {
-                        oldClient = clients.get(sndId);
+                if (oldFut == null) {
+                    oldClient = clients.get(sndId);
 
-                        if (oldClient != null) {
-                            if (oldClient instanceof GridTcpNioCommunicationClient) {
-                                if (log.isDebugEnabled())
-                                    log.debug("Received incoming connection when already connected " +
+                    if (oldClient != null) {
+                        if (oldClient instanceof GridTcpNioCommunicationClient) {
+                            if (log.isDebugEnabled())
+                                log.debug("Received incoming connection when already connected " +
                                         "to this node, rejecting [locNode=" + locNode.id() +
                                         ", rmtNode=" + sndId + ']');
 
-                                ses.send(new RecoveryLastReceivedMessage(-1));
+                            ses.send(new RecoveryLastReceivedMessage(-1));
 
-                                return;
-                            }
-                            else {
-                                assert oldClient instanceof GridShmemCommunicationClient;
+                            return;
+                        }
+                        else {
+                            assert oldClient instanceof GridShmemCommunicationClient;
 
-                                hasShmemClient = true;
-                            }
+                            hasShmemClient = true;
                         }
+                    }
 
-                        boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
+                    boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
                             new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut));
 
-                        if (log.isDebugEnabled())
-                            log.debug("Received incoming connection from remote node " +
+                    if (log.isDebugEnabled())
+                        log.debug("Received incoming connection from remote node " +
                                 "[rmtNode=" + rmtNode.id() + ", reserved=" + reserved + ']');
 
-                        if (reserved) {
-                            try {
-                                GridTcpNioCommunicationClient client =
+                    if (reserved) {
+                        try {
+                            GridTcpNioCommunicationClient client =
                                     connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
 
-                                fut.onDone(client);
-                            }
-                            finally {
-                                clientFuts.remove(rmtNode.id(), fut);
-                            }
+                            fut.onDone(client);
+                        }
+                        finally {
+                            clientFuts.remove(rmtNode.id(), fut);
                         }
                     }
-                    else {
-                        if (oldFut instanceof ConnectFuture && locNode.order() < rmtNode.order()) {
-                            if (log.isDebugEnabled()) {
-                                log.debug("Received incoming connection from remote node while " +
+                }
+                else {
+                    if (oldFut instanceof ConnectFuture && locNode.order() < rmtNode.order()) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("Received incoming connection from remote node while " +
                                     "connecting to this node, rejecting [locNode=" + locNode.id() +
                                     ", locNodeOrder=" + locNode.order() + ", rmtNode=" + rmtNode.id() +
                                     ", rmtNodeOrder=" + rmtNode.order() + ']');
-                            }
-
-                            ses.send(new RecoveryLastReceivedMessage(-1));
                         }
-                        else {
-                            boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
+
+                        ses.send(new RecoveryLastReceivedMessage(-1));
+                    }
+                    else {
+                        boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
                                 new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut));
 
-                            if (reserved) {
-                                GridTcpNioCommunicationClient client =
+                        if (reserved) {
+                            GridTcpNioCommunicationClient client =
                                     connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
 
-                                fut.onDone(client);
-                            }
+                            fut.onDone(client);
                         }
                     }
                 }
+            }
+
+            @Override public void onMessage(GridNioSession ses, Message msg) {
+                UUID sndId = ses.meta(NODE_ID_META);
+
+                if (sndId == null) {
+                    assert ses.accepted() : ses;
+
+                    if (!connectGate.tryEnter()) {
+                        if (log.isDebugEnabled())
+                            log.debug("Close incoming connection, failed to enter gateway.");
+
+                        ses.close();
+
+                        return;
+                    }
+
+                    try {
+                        onFirstMessage(ses, msg);
+                    }
+                    finally {
+                        connectGate.leave();
+                    }
+                }
                 else {
                     rcvdMsgsCnt.increment();
 
@@ -700,9 +730,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Address resolver. */
     private AddressResolver addrRslvr;
 
-    /** Local node ID message. */
-    private NodeIdMessage nodeIdMsg;
-
     /** Received messages count. */
     private final LongAdder8 rcvdMsgsCnt = new LongAdder8();
 
@@ -739,8 +766,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Discovery listener. */
     private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
         @Override public void onEvent(Event evt) {
-            assert evt instanceof DiscoveryEvent;
-            assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED;
+            assert evt instanceof DiscoveryEvent : evt;
+            assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ;
 
             onNodeLeft(((DiscoveryEvent)evt).eventNode().id());
         }
@@ -1237,8 +1264,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
     /** {@inheritDoc} */
     @Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException {
-        nodeIdMsg = new NodeIdMessage(getLocalNodeId());
-
         assertParameter(locPort > 1023, "locPort > 1023");
         assertParameter(locPort <= 0xffff, "locPort < 0xffff");
         assertParameter(locPortRange >= 0, "locPortRange >= 0");
@@ -1346,6 +1371,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
         registerMBean(gridName, this, TcpCommunicationSpiMBean.class);
 
+        connectGate = new ConnectGateway();
+
         if (shmemSrv != null) {
             shmemAcceptWorker = new ShmemAcceptWorker(shmemSrv);
 
@@ -1608,6 +1635,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             // Safety.
             ctxInitLatch.countDown();
 
+        if (connectGate != null)
+            connectGate.stopped();
+
         // Force closing.
         for (GridCommunicationClient client : clients.values())
             client.forceClose();
@@ -1617,6 +1647,27 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         getSpiContext().removeLocalEventListener(discoLsnr);
     }
 
+    /** {@inheritDoc} */
+    @Override public void onClientDisconnected(IgniteFuture<?> reconnectFut) {
+        connectGate.disconnected(reconnectFut);
+
+        for (GridCommunicationClient client : clients.values())
+            client.forceClose();
+
+        IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut,
+            "Failed to connect client node disconnected.");
+
+        for (GridFutureAdapter<GridCommunicationClient> clientFut : clientFuts.values())
+            clientFut.onDone(err);
+
+        recoveryDescs.clear();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onClientReconnected(boolean clusterRestarted) {
+        connectGate.reconnected();
+    }
+
     /**
      * @param nodeId Left node ID.
      */
@@ -1666,10 +1717,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         if (log.isTraceEnabled())
             log.trace("Sending message to node [node=" + node + ", msg=" + msg + ']');
 
-        UUID locNodeId = getLocalNodeId();
-
-        if (node.id().equals(locNodeId))
-            notifyListener(locNodeId, msg, NOOP);
+        if (node.id().equals(getLocalNode().id()))
+            notifyListener(node.id(), msg, NOOP);
         else {
             GridCommunicationClient client = null;
 
@@ -1834,7 +1883,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             }
         }
 
-        return createTcpClient(node);
+        connectGate.enter();
+
+        try {
+            return createTcpClient(node);
+        }
+        finally {
+            connectGate.leave();
+        }
     }
 
     /**
@@ -2208,7 +2264,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     ch.write(ByteBuffer.wrap(U.IGNITE_HEADER));
 
                     if (recovery != null) {
-                        HandshakeMessage msg = new HandshakeMessage(getLocalNodeId(),
+                        HandshakeMessage msg = new HandshakeMessage(getLocalNode().id(),
                             recovery.incrementConnectCount(),
                             recovery.receivedCount());
 
@@ -2228,7 +2284,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         ch.write(buf);
                     }
                     else
-                        ch.write(ByteBuffer.wrap(nodeIdMsg.nodeIdBytesWithType));
+                        ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType));
 
                     if (recovery != null) {
                         if (log.isDebugEnabled())
@@ -2355,6 +2411,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         getExceptionRegistry().onException(msg, e);
     }
 
+    /**
+     * @return Node ID message.
+     */
+    private NodeIdMessage nodeIdMessage() {
+        return new NodeIdMessage(getLocalNode().id());
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(TcpCommunicationSpi.class, this);
@@ -2692,10 +2755,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         private void processRecovery(GridNioRecoveryDescriptor recoveryDesc) {
             ClusterNode node = recoveryDesc.node();
 
-            if (clients.containsKey(node.id()) ||
-                !recoveryDesc.nodeAlive(getSpiContext().node(node.id())) ||
-                !getSpiContext().pingNode(node.id()))
+            try {
+                if (clients.containsKey(node.id()) ||
+                    !recoveryDesc.nodeAlive(getSpiContext().node(node.id())) ||
+                    !getSpiContext().pingNode(node.id()))
+                    return;
+            }
+            catch (IgniteClientDisconnectedException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to ping node, client disconnected.");
+
                 return;
+            }
 
             try {
                 if (log.isDebugEnabled())
@@ -2860,15 +2931,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             }
 
             try {
+                UUID id = getLocalNode().id();
+
+                NodeIdMessage msg = new NodeIdMessage(id);
+
                 out.write(U.IGNITE_HEADER);
                 out.write(NODE_ID_MSG_TYPE);
-                out.write(nodeIdMsg.nodeIdBytes);
+                out.write(msg.nodeIdBytes);
 
                 out.flush();
 
                 if (log.isDebugEnabled())
-                    log.debug("Sent local node ID [locNodeId=" + getLocalNodeId() + ", rmtNodeId="
-                        + rmtNodeId + ']');
+                    log.debug("Sent local node ID [locNodeId=" + id + ", rmtNodeId=" + rmtNodeId + ']');
             }
             catch (IOException e) {
                 throw new IgniteCheckedException("Failed to perform handshake.", e);
@@ -3082,6 +3156,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
          * @param nodeId Node ID.
          */
         private NodeIdMessage(UUID nodeId) {
+            assert nodeId != null;
+
             nodeIdBytes = U.uuidToBytes(nodeId);
 
             nodeIdBytesWithType = new byte[nodeIdBytes.length + 1];
@@ -3131,4 +3207,86 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             return S.toString(NodeIdMessage.class, this);
         }
     }
+
+    /**
+     *
+     */
+    private class ConnectGateway {
+        /** */
+        private GridSpinReadWriteLock lock = new GridSpinReadWriteLock();
+
+        /** */
+        private IgniteException err;
+
+        /**
+         *
+         */
+        void enter() {
+            lock.readLock();
+
+            if (err != null) {
+                lock.readUnlock();
+
+                throw err;
+            }
+        }
+
+        /**
+         * @return {@code True} if entered gateway.
+         */
+        boolean tryEnter() {
+            lock.readLock();
+
+            boolean res = err == null;
+
+            if (!res)
+                lock.readUnlock();
+
+            return res;
+        }
+
+        /**
+         *
+         */
+        void leave() {
+            lock.readUnlock();
+        }
+
+        /**
+         * @param reconnectFut Reconnect future.
+         */
+        void disconnected(IgniteFuture<?> reconnectFut) {
+            lock.writeLock();
+
+            err = new IgniteClientDisconnectedException(reconnectFut, "Failed to connect, client node disconnected.");
+
+            lock.writeUnlock();
+        }
+
+        /**
+         *
+         */
+        void reconnected() {
+            lock.writeLock();
+
+            try {
+                if (err instanceof IgniteClientDisconnectedException)
+                    err = null;
+            }
+            finally {
+                lock.writeUnlock();
+            }
+        }
+
+        /**
+         *
+         */
+        void stopped() {
+            lock.readLock();
+
+            err = new IgniteException("Failed to connect, node stopped.");
+
+            lock.readUnlock();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java
index 46d6716..038ea59 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java
@@ -39,7 +39,8 @@ public interface DiscoverySpiDataExchange {
     /**
      * Notifies discovery manager about data received from remote node.
      *
-     * @param joiningNodeId Remote node ID.
+     * @param joiningNodeId ID of new node that joins topology.
+     * @param nodeId ID of the node provided data.
      * @param data Collection of discovery data objects from different components.
      */
     public void onExchange(UUID joiningNodeId, UUID nodeId, Map<Integer, Serializable> data);

http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 3f05f59..572ba2c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
@@ -41,6 +42,7 @@ import java.util.concurrent.atomic.*;
 import static java.util.concurrent.TimeUnit.*;
 import static org.apache.ignite.events.EventType.*;
 import static org.apache.ignite.internal.events.DiscoveryCustomEvent.*;
+import static org.apache.ignite.spi.discovery.tcp.ClientImpl.State.*;
 
 /**
  *
@@ -71,7 +73,7 @@ class ClientImpl extends TcpDiscoveryImpl {
     private SocketReader sockReader;
 
     /** */
-    private boolean segmented;
+    private volatile State state;
 
     /** Last message ID. */
     private volatile IgniteUuid lastMsgId;
@@ -94,6 +96,10 @@ class ClientImpl extends TcpDiscoveryImpl {
     /** */
     protected MessageWorker msgWorker;
 
+    /** */
+    @GridToStringExclude
+    private int joinCnt;
+
     /**
      * @param adapter Adapter.
      */
@@ -157,6 +163,9 @@ class ClientImpl extends TcpDiscoveryImpl {
 
         locNode = spi.locNode;
 
+        // Marshal credentials for backward compatibility and security.
+        marshalCredentials(locNode);
+
         sockWriter = new SocketWriter();
         sockWriter.start();
 
@@ -258,23 +267,36 @@ class ClientImpl extends TcpDiscoveryImpl {
             if (oldFut != null)
                 fut = oldFut;
             else {
-                if (spi.getSpiContext().isStopping()) {
+                State state = this.state;
+
+                if (spi.getSpiContext().isStopping() || state == STOPPED || state == SEGMENTED) {
                     if (pingFuts.remove(nodeId, fut))
                         fut.onDone(false);
 
                     return false;
                 }
+                else if (state == DISCONNECTED) {
+                    if (pingFuts.remove(nodeId, fut))
+                        fut.onDone(new IgniteClientDisconnectedCheckedException(null,
+                            "Failed to ping node, client node disconnected."));
+                }
+                else {
+                    final GridFutureAdapter<Boolean> finalFut = fut;
 
-                final GridFutureAdapter<Boolean> finalFut = fut;
-
-                timer.schedule(new TimerTask() {
-                    @Override public void run() {
-                        if (pingFuts.remove(nodeId, finalFut))
-                            finalFut.onDone(false);
-                    }
-                }, spi.netTimeout);
+                    timer.schedule(new TimerTask() {
+                        @Override public void run() {
+                            if (pingFuts.remove(nodeId, finalFut)) {
+                                if (ClientImpl.this.state == DISCONNECTED)
+                                    finalFut.onDone(new IgniteClientDisconnectedCheckedException(null,
+                                        "Failed to ping node, client node disconnected."));
+                                else
+                                    finalFut.onDone(false);
+                            }
+                        }
+                    }, spi.netTimeout);
 
-                sockWriter.sendMessage(new TcpDiscoveryClientPingRequest(getLocalNodeId(), nodeId));
+                    sockWriter.sendMessage(new TcpDiscoveryClientPingRequest(getLocalNodeId(), nodeId));
+                }
             }
         }
 
@@ -285,7 +307,7 @@ class ClientImpl extends TcpDiscoveryImpl {
             return false;
         }
         catch (IgniteCheckedException e) {
-            throw new IgniteSpiException(e); // Should newer occur.
+            throw new IgniteSpiException(e);
         }
     }
 
@@ -325,8 +347,13 @@ class ClientImpl extends TcpDiscoveryImpl {
 
     /** {@inheritDoc} */
     @Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) {
-        if (segmented)
-            throw new IgniteException("Failed to send custom message: client is disconnected");
+        State state = this.state;
+
+        if (state == SEGMENTED)
+            throw new IgniteException("Failed to send custom message: client is segmented.");
+
+        if (state == DISCONNECTED)
+            throw new IgniteClientDisconnectedException(null, "Failed to send custom message: client is disconnected.");
 
         try {
             sockWriter.sendMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt,
@@ -361,14 +388,11 @@ class ClientImpl extends TcpDiscoveryImpl {
      * @see TcpDiscoverySpi#joinTimeout
      */
     @SuppressWarnings("BusyWait")
-    @Nullable private Socket joinTopology(boolean recon, long timeout) throws IgniteSpiException, InterruptedException {
+    @Nullable private T2<Socket, Boolean> joinTopology(boolean recon, long timeout) throws IgniteSpiException, InterruptedException {
         Collection<InetSocketAddress> addrs = null;
 
         long startTime = U.currentTimeMillis();
 
-        // Marshal credentials for backward compatibility and security.
-        marshalCredentials(locNode);
-
         while (true) {
             if (Thread.currentThread().isInterrupted())
                 throw new InterruptedException();
@@ -400,7 +424,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 InetSocketAddress addr = it.next();
 
-                T2<Socket, Integer> sockAndRes = sendJoinRequest(recon, addr);
+                T3<Socket, Integer, Boolean> sockAndRes = sendJoinRequest(recon, addr);
 
                 if (sockAndRes == null) {
                     it.remove();
@@ -414,7 +438,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 switch (sockAndRes.get2()) {
                     case RES_OK:
-                        return sock;
+                        return new T2<>(sock, sockAndRes.get3());
 
                     case RES_CONTINUE_JOIN:
                     case RES_WAIT:
@@ -445,9 +469,9 @@ class ClientImpl extends TcpDiscoveryImpl {
     /**
      * @param recon {@code True} if reconnects.
      * @param addr Address.
-     * @return Socket and connect response.
+     * @return Socket, connect response and client acknowledge support flag.
      */
-    @Nullable private T2<Socket, Integer> sendJoinRequest(boolean recon, InetSocketAddress addr) {
+    @Nullable private T3<Socket, Integer, Boolean> sendJoinRequest(boolean recon, InetSocketAddress addr) {
         assert addr != null;
 
         if (log.isDebugEnabled())
@@ -493,9 +517,18 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 tstamp = U.currentTimeMillis();
 
-                TcpDiscoveryAbstractMessage msg = recon ?
-                    new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId, lastMsgId) :
-                    new TcpDiscoveryJoinRequestMessage(locNode, spi.collectExchangeData(getLocalNodeId()));
+                TcpDiscoveryAbstractMessage msg;
+
+                if (!recon) {
+                    TcpDiscoveryNode node = locNode;
+
+                    if (locNode.order() > 0)
+                        node = locNode.clientReconnectNode();
+
+                    msg = new TcpDiscoveryJoinRequestMessage(node, spi.collectExchangeData(getLocalNodeId()));
+                }
+                else
+                    msg = new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId, lastMsgId);
 
                 msg.client(true);
 
@@ -507,7 +540,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                     log.debug("Message has been sent to address [msg=" + msg + ", addr=" + addr +
                         ", rmtNodeId=" + rmtNodeId + ']');
 
-                return new T2<>(sock, spi.readReceipt(sock, ackTimeout0));
+                return new T3<>(sock, spi.readReceipt(sock, ackTimeout0), res.clientAck());
             }
             catch (IOException | IgniteCheckedException e) {
                 U.closeQuiet(sock);
@@ -786,10 +819,16 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                         spi.stats.onMessageReceived(msg);
 
-                        if (spi.ensured(msg) && joinLatch.getCount() == 0L)
-                            lastMsgId = msg.id();
+                        boolean ack = msg instanceof TcpDiscoveryClientAckResponse;
+
+                        if (!ack) {
+                            if (spi.ensured(msg) && joinLatch.getCount() == 0L)
+                                lastMsgId = msg.id();
 
-                        msgWorker.addMessage(msg);
+                            msgWorker.addMessage(msg);
+                        }
+                        else
+                            sockWriter.ackReceived((TcpDiscoveryClientAckResponse)msg);
                     }
                 }
                 catch (IOException e) {
@@ -823,8 +862,14 @@ class ClientImpl extends TcpDiscoveryImpl {
         private Socket sock;
 
         /** */
+        private boolean clientAck;
+
+        /** */
         private final Queue<TcpDiscoveryAbstractMessage> queue = new ArrayDeque<>();
 
+        /** */
+        private TcpDiscoveryAbstractMessage unackedMsg;
+
         /**
          *
          */
@@ -845,11 +890,16 @@ class ClientImpl extends TcpDiscoveryImpl {
 
         /**
          * @param sock Socket.
+         * @param clientAck {@code True} is server supports client message acknowlede.
          */
-        private void setSocket(Socket sock) {
+        private void setSocket(Socket sock, boolean clientAck) {
             synchronized (mux) {
                 this.sock = sock;
 
+                this.clientAck = clientAck;
+
+                unackedMsg = null;
+
                 mux.notifyAll();
             }
         }
@@ -863,6 +913,21 @@ class ClientImpl extends TcpDiscoveryImpl {
             }
         }
 
+        /**
+         * @param res Acknowledge response.
+         */
+        void ackReceived(TcpDiscoveryClientAckResponse res) {
+            synchronized (mux) {
+                if (unackedMsg != null) {
+                    assert unackedMsg.id().equals(res.messageId()) : unackedMsg;
+
+                    unackedMsg = null;
+                }
+
+                mux.notifyAll();
+            }
+        }
+
         /** {@inheritDoc} */
         @Override protected void body() throws InterruptedException {
             TcpDiscoveryAbstractMessage msg = null;
@@ -892,10 +957,43 @@ class ClientImpl extends TcpDiscoveryImpl {
                 for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : spi.sendMsgLsnrs)
                     msgLsnr.apply(msg);
 
+                boolean ack = clientAck && !(msg instanceof TcpDiscoveryPingResponse);
+
                 try {
+                    if (ack) {
+                        synchronized (mux) {
+                            assert unackedMsg == null : unackedMsg;
+
+                            unackedMsg = msg;
+                        }
+                    }
+
                     spi.writeToSocket(sock, msg);
 
                     msg = null;
+
+                    if (ack) {
+                        long waitEnd = U.currentTimeMillis() + spi.ackTimeout;
+
+                        TcpDiscoveryAbstractMessage unacked;
+
+                        synchronized (mux) {
+                            while (unackedMsg != null && U.currentTimeMillis() < waitEnd)
+                                mux.wait(waitEnd);
+
+                            unacked = unackedMsg;
+
+                            unackedMsg = null;
+                        }
+
+                        if (unacked != null) {
+                            if (log.isDebugEnabled())
+                                log.debug("Failed to get acknowledge for message, will try to reconnect " +
+                                "[msg=" + unacked + ", timeout=" + spi.ackTimeout + ']');
+
+                            throw new IOException("Failed to get acknowledge for message: " + unacked);
+                        }
+                    }
                 }
                 catch (IOException e) {
                     if (log.isDebugEnabled())
@@ -926,6 +1024,9 @@ class ClientImpl extends TcpDiscoveryImpl {
         private volatile Socket sock;
 
         /** */
+        private boolean clientAck;
+
+        /** */
         private boolean join;
 
         /**
@@ -948,8 +1049,6 @@ class ClientImpl extends TcpDiscoveryImpl {
 
         /** {@inheritDoc} */
         @Override protected void body() throws InterruptedException {
-            assert !segmented;
-
             boolean success = false;
 
             Exception err = null;
@@ -958,11 +1057,14 @@ class ClientImpl extends TcpDiscoveryImpl {
 
             long startTime = U.currentTimeMillis();
 
+            if (log.isDebugEnabled())
+                log.debug("Started reconnect process [join=" + join + ", timeout=" + timeout + ']');
+
             try {
                 while (true) {
-                    sock = joinTopology(true, timeout);
+                    T2<Socket, Boolean> joinRes = joinTopology(true, timeout);
 
-                    if (sock == null) {
+                    if (joinRes == null) {
                         if (join) {
                             joinError(new IgniteSpiException("Join process timed out, connection failed and " +
                                 "failed to reconnect (consider increasing 'joinTimeout' configuration property) " +
@@ -970,11 +1072,14 @@ class ClientImpl extends TcpDiscoveryImpl {
                         }
                         else
                             U.error(log, "Failed to reconnect to cluster (consider increasing 'networkTimeout' " +
-                                "configuration  property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock + ']');
+                                "configuration property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock + ']');
 
                         return;
                     }
 
+                    sock = joinRes.get1();
+                    clientAck = joinRes.get2();
+
                     if (isInterrupted())
                         throw new InterruptedException();
 
@@ -999,6 +1104,10 @@ class ClientImpl extends TcpDiscoveryImpl {
                                 TcpDiscoveryClientReconnectMessage res = (TcpDiscoveryClientReconnectMessage)msg;
 
                                 if (res.creatorNodeId().equals(getLocalNodeId())) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Received reconnect response [success=" + res.success() +
+                                            ", msg=" + msg + ']');
+
                                     if (res.success()) {
                                         msgWorker.addMessage(res);
 
@@ -1008,9 +1117,11 @@ class ClientImpl extends TcpDiscoveryImpl {
                                         }
 
                                         success = true;
-                                    }
 
-                                    return;
+                                        return;
+                                    }
+                                    else
+                                        return;
                                 }
                             }
                             else if (spi.ensured(msg)) {
@@ -1081,6 +1192,9 @@ class ClientImpl extends TcpDiscoveryImpl {
         /** */
         private Reconnector reconnector;
 
+        /** */
+        private boolean nodeAdded;
+
         /**
          *
          */
@@ -1091,45 +1205,37 @@ class ClientImpl extends TcpDiscoveryImpl {
         /** {@inheritDoc} */
         @SuppressWarnings("InfiniteLoopStatement")
         @Override protected void body() throws InterruptedException {
+            state = STARTING;
+
             spi.stats.onJoinStarted();
 
             try {
-                final Socket sock = joinTopology(false, spi.joinTimeout);
-
-                if (sock == null) {
-                    joinError(new IgniteSpiException("Join process timed out."));
-
-                    return;
-                }
-
-                currSock = sock;
-
-                sockWriter.setSocket(sock);
-
-                if (spi.joinTimeout > 0) {
-                    timer.schedule(new TimerTask() {
-                        @Override public void run() {
-                            if (joinLatch.getCount() > 0)
-                                queue.add(JOIN_TIMEOUT);
-                        }
-                    }, spi.joinTimeout);
-                }
-
-                sockReader.setSocket(sock, locNode.clientRouterNodeId());
+                tryJoin();
 
                 while (true) {
                     Object msg = queue.take();
 
                     if (msg == JOIN_TIMEOUT) {
-                        if (joinLatch.getCount() > 0) {
+                        if (state == STARTING) {
                             joinError(new IgniteSpiException("Join process timed out, did not receive response for " +
                                 "join request (consider increasing 'joinTimeout' configuration property) " +
-                                "[joinTimeout=" + spi.joinTimeout + ", sock=" + sock + ']'));
+                                "[joinTimeout=" + spi.joinTimeout + ", sock=" + currSock + ']'));
 
                             break;
                         }
+                        else if (state == DISCONNECTED) {
+                            if (log.isDebugEnabled())
+                                log.debug("Failed to reconnect, local node segmented " +
+                                    "[joinTimeout=" + spi.joinTimeout + ']');
+
+                            state = SEGMENTED;
+
+                            notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
+                        }
                     }
                     else if (msg == SPI_STOP) {
+                        state = STOPPED;
+
                         assert spi.getSpiContext().isStopping();
 
                         if (currSock != null) {
@@ -1148,7 +1254,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                             boolean join = joinLatch.getCount() > 0;
 
-                            if (spi.getSpiContext().isStopping() || segmented) {
+                            if (spi.getSpiContext().isStopping() || state == SEGMENTED) {
                                 leaveLatch.countDown();
 
                                 if (join) {
@@ -1158,6 +1264,9 @@ class ClientImpl extends TcpDiscoveryImpl {
                                 }
                             }
                             else {
+                                if (log.isDebugEnabled())
+                                    log.debug("Connection closed, will try to restore connection.");
+
                                 assert reconnector == null;
 
                                 final Reconnector reconnector = new Reconnector(join);
@@ -1167,19 +1276,64 @@ class ClientImpl extends TcpDiscoveryImpl {
                         }
                     }
                     else if (msg == SPI_RECONNECT_FAILED) {
-                        if (!segmented) {
-                            segmented = true;
+                        reconnector.cancel();
+                        reconnector.join();
 
-                            reconnector.cancel();
-                            reconnector.join();
+                        reconnector = null;
 
-                            notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
+                        if (spi.isClientReconnectDisabled()) {
+                            if (state != SEGMENTED && state != STOPPED) {
+                                if (log.isDebugEnabled()) {
+                                    log.debug("Failed to restore closed connection, reconnect disabled, " +
+                                        "local node segmented [networkTimeout=" + spi.netTimeout + ']');
+                                }
+
+                                state = SEGMENTED;
+
+                                notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
+                            }
+                        }
+                        else {
+                            if (state == STARTING || state == CONNECTED) {
+                                if (log.isDebugEnabled()) {
+                                    log.debug("Failed to restore closed connection, will try to reconnect " +
+                                        "[networkTimeout=" + spi.netTimeout + ", joinTimeout=" + spi.joinTimeout + ']');
+                                }
+
+                                state = DISCONNECTED;
+
+                                nodeAdded = false;
+
+                                IgniteClientDisconnectedCheckedException err =
+                                    new IgniteClientDisconnectedCheckedException(null, "Failed to ping node, " +
+                                    "client node disconnected.");
+
+                                for (Map.Entry<UUID, GridFutureAdapter<Boolean>> e : pingFuts.entrySet()) {
+                                    GridFutureAdapter<Boolean> fut = e.getValue();
+
+                                    if (pingFuts.remove(e.getKey(), fut))
+                                        fut.onDone(err);
+                                }
+
+                                notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes());
+                            }
+
+                            UUID newId = UUID.randomUUID();
+
+                            if (log.isInfoEnabled()) {
+                                log.info("Client node disconnected from cluster, will try to reconnect with new id " +
+                                    "[newId=" + newId + ", prevId=" + locNode.id() + ", locNode=" + locNode + ']');
+                            }
+
+                            locNode.onClientDisconnected(newId);
+
+                            tryJoin();
                         }
                     }
                     else {
                         TcpDiscoveryAbstractMessage discoMsg = (TcpDiscoveryAbstractMessage)msg;
 
-                        if (joinLatch.getCount() > 0) {
+                        if (joining()) {
                             IgniteSpiException err = null;
 
                             if (discoMsg instanceof TcpDiscoveryDuplicateIdMessage)
@@ -1190,7 +1344,15 @@ class ClientImpl extends TcpDiscoveryImpl {
                                 err = spi.checkFailedError((TcpDiscoveryCheckFailedMessage)msg);
 
                             if (err != null) {
-                                joinError(err);
+                                if (state == DISCONNECTED) {
+                                    U.error(log, "Failed to reconnect, segment local node.", err);
+
+                                    state = SEGMENTED;
+
+                                    notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
+                                }
+                                else
+                                    joinError(err);
 
                                 break;
                             }
@@ -1215,6 +1377,48 @@ class ClientImpl extends TcpDiscoveryImpl {
         }
 
         /**
+         * @throws InterruptedException If interrupted.
+         */
+        private void tryJoin() throws InterruptedException {
+            assert state == DISCONNECTED || state == STARTING : state;
+
+            boolean join = state == STARTING;
+
+            joinCnt++;
+
+            T2<Socket, Boolean> joinRes = joinTopology(false, spi.joinTimeout);
+
+            if (joinRes == null) {
+                if (join)
+                    joinError(new IgniteSpiException("Join process timed out."));
+                else {
+                    state = SEGMENTED;
+
+                    notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
+                }
+
+                return;
+            }
+
+            currSock = joinRes.get1();
+
+            sockWriter.setSocket(joinRes.get1(), joinRes.get2());
+
+            if (spi.joinTimeout > 0) {
+                final int joinCnt0 = joinCnt;
+
+                timer.schedule(new TimerTask() {
+                    @Override public void run() {
+                        if (joinCnt == joinCnt0 && joining())
+                            queue.add(JOIN_TIMEOUT);
+                    }
+                }, spi.joinTimeout);
+            }
+
+            sockReader.setSocket(joinRes.get1(), locNode.clientRouterNodeId());
+        }
+
+        /**
          * @param msg Message.
          */
         protected void processDiscoveryMessage(TcpDiscoveryAbstractMessage msg) {
@@ -1246,6 +1450,22 @@ class ClientImpl extends TcpDiscoveryImpl {
         }
 
         /**
+         * @return {@code True} if client in process of join.
+         */
+        private boolean joining() {
+            ClientImpl.State state = ClientImpl.this.state;
+
+            return state == STARTING || state == DISCONNECTED;
+        }
+
+        /**
+         * @return {@code True} if client disconnected.
+         */
+        private boolean disconnected() {
+            return state == DISCONNECTED;
+        }
+
+        /**
          * @param msg Message.
          */
         private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) {
@@ -1257,12 +1477,15 @@ class ClientImpl extends TcpDiscoveryImpl {
             UUID newNodeId = node.id();
 
             if (getLocalNodeId().equals(newNodeId)) {
-                if (joinLatch.getCount() > 0) {
+                if (joining()) {
                     Collection<TcpDiscoveryNode> top = msg.topology();
 
                     if (top != null) {
                         spi.gridStartTime = msg.gridStartTime();
 
+                        if (disconnected())
+                            rmtNodes.clear();
+
                         for (TcpDiscoveryNode n : top) {
                             if (n.order() > 0)
                                 n.visible(true);
@@ -1272,6 +1495,8 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                         topHist.clear();
 
+                        nodeAdded = true;
+
                         if (msg.topologyHistory() != null)
                             topHist.putAll(msg.topologyHistory());
                     }
@@ -1309,7 +1534,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                 return;
 
             if (getLocalNodeId().equals(msg.nodeId())) {
-                if (joinLatch.getCount() > 0) {
+                if (joining()) {
                     Map<UUID, Map<Integer, byte[]>> dataMap = msg.clientDiscoData();
 
                     if (dataMap != null) {
@@ -1324,13 +1549,22 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                     locNode.order(topVer);
 
-                    notifyDiscovery(EVT_NODE_JOINED, topVer, locNode, updateTopologyHistory(topVer, msg));
+                    Collection<ClusterNode> nodes = updateTopologyHistory(topVer, msg);
+
+                    notifyDiscovery(EVT_NODE_JOINED, topVer, locNode, nodes);
+
+                    boolean disconnected = disconnected();
+
+                    state = CONNECTED;
+
+                    if (disconnected)
+                        notifyDiscovery(EVT_CLIENT_NODE_RECONNECTED, topVer, locNode, nodes);
+                    else
+                        spi.stats.onJoinFinished();
 
                     joinErr.set(null);;
 
                     joinLatch.countDown();
-
-                    spi.stats.onJoinFinished();
                 }
                 else if (log.isDebugEnabled())
                     log.debug("Discarding node add finished message (this message has already been processed) " +
@@ -1438,7 +1672,7 @@ class ClientImpl extends TcpDiscoveryImpl {
          * @return {@code True} if received node added message for local node.
          */
         private boolean nodeAdded() {
-            return !topHist.isEmpty();
+            return nodeAdded;
         }
 
         /**
@@ -1539,7 +1773,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                     currSock = reconnector.sock;
 
-                    sockWriter.setSocket(currSock);
+                    sockWriter.setSocket(currSock, reconnector.clientAck);
                     sockReader.setSocket(currSock, locNode.clientRouterNodeId());
 
                     reconnector = null;
@@ -1583,7 +1817,7 @@ class ClientImpl extends TcpDiscoveryImpl {
          * @param msg Message.
          */
         private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
-            if (msg.verified() && joinLatch.getCount() == 0) {
+            if (msg.verified() && state == CONNECTED) {
                 DiscoverySpiListener lsnr = spi.lsnr;
 
                 if (lsnr != null) {
@@ -1719,4 +1953,24 @@ class ClientImpl extends TcpDiscoveryImpl {
             this.sock = sock;
         }
     }
+
+    /**
+     *
+     */
+    enum State {
+        /** */
+        STARTING,
+
+        /** */
+        CONNECTED,
+
+        /** */
+        DISCONNECTED,
+
+        /** */
+        SEGMENTED,
+
+        /** */
+        STOPPED
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index d51293e..1a28e86 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1447,6 +1447,8 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (log.isDebugEnabled())
                 log.debug("Heartbeats sender has been started.");
 
+            UUID nodeId = getConfiguredNodeId();
+
             while (!isInterrupted()) {
                 if (spiStateCopy() != CONNECTED) {
                     if (log.isDebugEnabled())
@@ -1455,7 +1457,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     return;
                 }
 
-                TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(getLocalNodeId());
+                TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(nodeId);
 
                 msg.verify(getLocalNodeId());
 
@@ -1593,39 +1595,47 @@ class ServerImpl extends TcpDiscoveryImpl {
                 // Addresses registered in IP finder.
                 Collection<InetSocketAddress> regAddrs = spi.registeredAddresses();
 
-                // Remove all addresses that belong to alive nodes, leave dead-node addresses.
-                Collection<InetSocketAddress> rmvAddrs = F.view(
-                    regAddrs,
-                    F.notContains(currAddrs),
-                    new P1<InetSocketAddress>() {
-                        private final Map<InetSocketAddress, Boolean> pingResMap = new HashMap<>();
+                P1<InetSocketAddress> p = new P1<InetSocketAddress>() {
+                    private final Map<InetSocketAddress, Boolean> pingResMap = new HashMap<>();
 
-                        @Override public boolean apply(InetSocketAddress addr) {
-                            Boolean res = pingResMap.get(addr);
+                    @Override public boolean apply(InetSocketAddress addr) {
+                        Boolean res = pingResMap.get(addr);
 
-                            if (res == null) {
-                                try {
-                                    res = pingNode(addr, null).get1() != null;
-                                }
-                                catch (IgniteCheckedException e) {
-                                    if (log.isDebugEnabled())
-                                        log.debug("Failed to ping node [addr=" + addr +
-                                            ", err=" + e.getMessage() + ']');
-
-                                    res = false;
-                                }
-                                finally {
-                                    pingResMap.put(addr, res);
-                                }
+                        if (res == null) {
+                            try {
+                                res = pingNode(addr, null).get1() != null;
                             }
+                            catch (IgniteCheckedException e) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Failed to ping node [addr=" + addr +
+                                        ", err=" + e.getMessage() + ']');
 
-                            return !res;
+                                res = false;
+                            }
+                            finally {
+                                pingResMap.put(addr, res);
+                            }
                         }
+
+                        return !res;
                     }
-                );
+                };
+
+                ArrayList<InetSocketAddress> rmvAddrs = null;
+
+                for (InetSocketAddress addr : regAddrs) {
+                    boolean rmv = !F.contains(currAddrs, addr) && p.apply(addr);
+
+                    if (rmv) {
+                        if (rmvAddrs == null)
+                            rmvAddrs = new ArrayList<>();
+
+                        rmvAddrs.add(addr);
+                    }
+                }
 
                 // Unregister dead-nodes addresses.
-                if (!rmvAddrs.isEmpty()) {
+                if (rmvAddrs != null) {
                     spi.ipFinder.unregisterAddresses(rmvAddrs);
 
                     if (log.isDebugEnabled())
@@ -4077,7 +4087,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
         /** {@inheritDoc} */
         @Override protected void body() throws InterruptedException {
-            UUID locNodeId = getLocalNodeId();
+            UUID locNodeId = getConfiguredNodeId();
 
             ClientMessageWorker clientMsgWrk = null;
 
@@ -4170,6 +4180,9 @@ class ServerImpl extends TcpDiscoveryImpl {
                     TcpDiscoveryHandshakeResponse res =
                         new TcpDiscoveryHandshakeResponse(locNodeId, locNode.internalOrder());
 
+                    if (req.client())
+                        res.clientAck(true);
+
                     spi.writeToSocket(sock, res);
 
                     // It can happen if a remote node is stopped and it has a loopback address in the list of addresses,
@@ -4313,7 +4326,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 if (state == CONNECTED) {
                                     spi.writeToSocket(msg, sock, RES_OK);
 
-                                    if (clientMsgWrk != null && clientMsgWrk.getState() == State.NEW)
+                                    if (clientMsgWrk.getState() == State.NEW)
                                         clientMsgWrk.start();
 
                                     msgWorker.addMessage(msg);
@@ -4457,7 +4470,14 @@ class ServerImpl extends TcpDiscoveryImpl {
                         msgWorker.addMessage(msg);
 
                         // Send receipt back.
-                        if (clientMsgWrk == null)
+                        if (clientMsgWrk != null) {
+                            TcpDiscoveryClientAckResponse ack = new TcpDiscoveryClientAckResponse(locNodeId, msg.id());
+
+                            ack.verify(locNodeId);
+
+                            clientMsgWrk.addMessage(ack);
+                        }
+                        else
                             spi.writeToSocket(msg, sock, RES_OK);
                     }
                     catch (IgniteCheckedException e) {
@@ -4567,8 +4587,11 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 msg.responded(true);
 
-                if (clientMsgWrk != null && clientMsgWrk.getState() == State.NEW)
+                if (clientMsgWrk != null && clientMsgWrk.getState() == State.NEW) {
+                    clientMsgWrk.clientVersion(U.productVersion(msg.node()));
+
                     clientMsgWrk.start();
+                }
 
                 msgWorker.addMessage(msg);
 
@@ -4679,6 +4702,9 @@ class ServerImpl extends TcpDiscoveryImpl {
         /** */
         private final AtomicReference<GridFutureAdapter<Boolean>> pingFut = new AtomicReference<>();
 
+        /** */
+        private IgniteProductVersion clientVer;
+
         /**
          * @param sock Socket.
          * @param clientNodeId Node ID.
@@ -4691,6 +4717,13 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
+         * @param clientVer Client version.
+         */
+        void clientVersion(IgniteProductVersion clientVer) {
+            this.clientVer = clientVer;
+        }
+
+        /**
          * @return Current client metrics.
          */
         ClusterMetrics metrics() {
@@ -4709,17 +4742,40 @@ class ServerImpl extends TcpDiscoveryImpl {
             try {
                 assert msg.verified() : msg;
 
-                if (log.isDebugEnabled())
-                    log.debug("Redirecting message to client [sock=" + sock + ", locNodeId="
-                        + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
+                if (msg instanceof TcpDiscoveryClientAckResponse) {
+                    if (clientVer == null) {
+                        ClusterNode node = spi.getNode(clientNodeId);
 
-                try {
-                    prepareNodeAddedMessage(msg, clientNodeId, null, null);
+                        if (node != null)
+                            clientVer = IgniteUtils.productVersion(node);
+                        else if (log.isDebugEnabled())
+                            log.debug("Skip sending message ack to client, fail to get client node " +
+                                "[sock=" + sock + ", locNodeId=" + getLocalNodeId() +
+                                ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
+                    }
+
+                    if (clientVer != null &&
+                        clientVer.compareTo(TcpDiscoveryClientAckResponse.CLIENT_ACK_SINCE_VERSION) >= 0) {
+                        if (log.isDebugEnabled())
+                            log.debug("Sending message ack to client [sock=" + sock + ", locNodeId="
+                                + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
 
-                    writeToSocket(sock, msg);
+                        writeToSocket(sock, msg);
+                    }
                 }
-                finally {
-                    clearNodeAddedMessage(msg);
+                else {
+                    try {
+                        if (log.isDebugEnabled())
+                            log.debug("Redirecting message to client [sock=" + sock + ", locNodeId="
+                                + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
+
+                        prepareNodeAddedMessage(msg, clientNodeId, null, null);
+
+                        writeToSocket(sock, msg);
+                    }
+                    finally {
+                        clearNodeAddedMessage(msg);
+                    }
                 }
             }
             catch (IgniteCheckedException | IOException e) {
@@ -4829,7 +4885,7 @@ class ServerImpl extends TcpDiscoveryImpl {
         /** {@inheritDoc} */
         @Override protected void body() throws InterruptedException {
             if (log.isDebugEnabled())
-                log.debug("Message worker started [locNodeId=" + getLocalNodeId() + ']');
+                log.debug("Message worker started [locNodeId=" + getConfiguredNodeId() + ']');
 
             while (!isInterrupted()) {
                 TcpDiscoveryAbstractMessage msg = queue.poll(2000, TimeUnit.MILLISECONDS);

http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index ace917f..c271b7c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -112,7 +112,14 @@ abstract class TcpDiscoveryImpl {
      * @return Local node ID.
      */
     public UUID getLocalNodeId() {
-        return spi.getLocalNodeId();
+        return spi.locNode.id();
+    }
+
+    /**
+     * @return Configured node ID (actual node ID can be different if client reconnects).
+     */
+    public UUID getConfiguredNodeId() {
+        return spi.cfgNodeId;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 7663fe6..431d198 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -260,6 +260,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     /** Local node. */
     protected TcpDiscoveryNode locNode;
 
+    /** */
+    protected UUID cfgNodeId;
+
     /** Local host. */
     protected InetAddress locHost;
 
@@ -327,6 +330,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     /** */
     private boolean forceSrvMode;
 
+    /** */
+    private boolean clientReconnectDisabled;
+
     /** {@inheritDoc} */
     @Override public String getSpiState() {
         return impl.getSpiState();
@@ -417,6 +423,29 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     }
 
     /**
+     * If {@code true} client does not try to reconnect after
+     * server detected client node failure.
+     *
+     * @return Client reconnect disabled flag.
+     */
+    public boolean isClientReconnectDisabled() {
+        return clientReconnectDisabled;
+    }
+
+    /**
+     * Sets client reconnect disabled flag.
+     * <p>
+     * If {@code true} client does not try to reconnect after
+     * server detected client node failure.
+     *
+     * @param clientReconnectDisabled Client reconnect disabled flag.
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public void setClientReconnectDisabled(boolean clientReconnectDisabled) {
+        this.clientReconnectDisabled = clientReconnectDisabled;
+    }
+
+    /**
      * Inject resources
      *
      * @param ignite Ignite.
@@ -844,7 +873,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
         }
 
         locNode = new TcpDiscoveryNode(
-            getLocalNodeId(),
+            ignite.configuration().getNodeId(),
             addrs.get1(),
             addrs.get2(),
             srvPort,
@@ -1615,6 +1644,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
                 mcastIpFinder.setLocalAddress(locAddr);
         }
 
+        cfgNodeId = ignite.configuration().getNodeId();
+
         impl.spiStart(gridName);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
index 22f56c3..032cf01 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
@@ -441,6 +441,25 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste
         this.clientRouterNodeId = clientRouterNodeId;
     }
 
+    /**
+     * @param newId New node ID.
+     */
+    public void onClientDisconnected(UUID newId) {
+        id = newId;
+    }
+
+    /**
+     * @return Copy of local node for client reconnect request.
+     */
+    public TcpDiscoveryNode clientReconnectNode() {
+        TcpDiscoveryNode node = new TcpDiscoveryNode(id, addrs, hostNames, discPort, metricsProvider, ver);
+
+        node.attrs = attrs;
+        node.clientRouterNodeId = clientRouterNodeId;
+
+        return node;
+    }
+
     /** {@inheritDoc} */
     @Override public int compareTo(@Nullable TcpDiscoveryNode node) {
         if (node == null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
index 21dbf4f..6f52152 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
@@ -40,6 +40,9 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable {
     /** */
     protected static final int CLIENT_RECON_SUCCESS_FLAG_POS = 2;
 
+    /** */
+    protected static final int CLIENT_ACK_FLAG_POS = 4;
+
     /** Sender of the message (transient). */
     private transient UUID sndNodeId;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientAckResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientAckResponse.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientAckResponse.java
new file mode 100644
index 0000000..ce3943a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientAckResponse.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp.messages;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+public class TcpDiscoveryClientAckResponse extends TcpDiscoveryAbstractMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    public static final IgniteProductVersion CLIENT_ACK_SINCE_VERSION = IgniteProductVersion.fromString("1.4.1");
+
+    /** */
+    private final IgniteUuid msgId;
+
+    /**
+     * @param creatorNodeId Creator node ID.
+     * @param msgId Message ID to ack.
+     */
+    public TcpDiscoveryClientAckResponse(UUID creatorNodeId, IgniteUuid msgId) {
+        super(creatorNodeId);
+
+        this.msgId = msgId;
+    }
+
+    /**
+     * @return Acknowledged message ID.
+     */
+    public IgniteUuid messageId() {
+        return msgId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean highPriority() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(TcpDiscoveryClientAckResponse.class, this, "super", super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java
index 5c2f798..ac4be50 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java
@@ -61,6 +61,20 @@ public class TcpDiscoveryHandshakeResponse extends TcpDiscoveryAbstractMessage {
         this.order = order;
     }
 
+    /**
+     * @return {@code True} if server supports client message acknowledge.
+     */
+    public boolean clientAck() {
+        return getFlag(CLIENT_ACK_FLAG_POS);
+    }
+
+    /**
+     * @param clientAck {@code True} if server supports client message acknowledge.
+     */
+    public void clientAck(boolean clientAck) {
+        setFlag(CLIENT_ACK_FLAG_POS, clientAck);
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(TcpDiscoveryHandshakeResponse.class, this, "super", super.toString());

http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
index 7a88426..000782a 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
@@ -257,7 +257,7 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi,
 
         registerMBean(gridName, this, FileSwapSpaceSpiMBean.class);
 
-        String path = baseDir + File.separator + gridName + File.separator + getLocalNodeId();
+        String path = baseDir + File.separator + gridName + File.separator + ignite.configuration().getNodeId();
 
         try {
             dir = U.resolveWorkDirectory(path, true);

http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java
index abc9109..bf499c3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal;
 
 import org.apache.ignite.*;
+import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.*;
@@ -104,8 +105,6 @@ public class GridUpdateNotifierSelfTest extends GridCommonAbstractTest {
      * Test kernal gateway that always return uninitialized user stack trace.
      */
     private static final GridKernalGateway TEST_GATEWAY = new GridKernalGateway() {
-        @Override public void lightCheck() throws IllegalStateException {}
-
         @Override public void readLock() throws IllegalStateException {}
 
         @Override public void readLockAnyway() {}
@@ -122,10 +121,6 @@ public class GridUpdateNotifierSelfTest extends GridCommonAbstractTest {
 
         @Override public void writeUnlock() {}
 
-        @Override public void addStopListener(Runnable lsnr) {}
-
-        @Override public void removeStopListener(Runnable lsnr) {}
-
         @Override public String userStackTrace() {
             return null;
         }
@@ -133,5 +128,13 @@ public class GridUpdateNotifierSelfTest extends GridCommonAbstractTest {
         @Override public boolean tryWriteLock(long timeout) {
             return false;
         }
+
+        @Override public GridFutureAdapter<?> onDisconnected() {
+            return null;
+        }
+
+        @Override public void onReconnected() {
+            // No-op.
+        }
     };
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
new file mode 100644
index 0000000..fbaea11
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.internal.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.spi.discovery.tcp.messages.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.*;
+import java.io.*;
+import java.net.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ *
+ */
+public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final long RECONNECT_TIMEOUT = 10_000;
+
+    /** */
+    protected boolean clientMode;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TestTcpDiscoverySpi disco = new TestTcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+        disco.setJoinTimeout(2 * 60_000);
+        disco.setSocketTimeout(1000);
+        disco.setNetworkTimeout(2000);
+
+        cfg.setDiscoverySpi(disco);
+
+        BlockTpcCommunicationSpi commSpi = new BlockTpcCommunicationSpi();
+
+        commSpi.setSharedMemoryPort(-1);
+
+        cfg.setCommunicationSpi(commSpi);
+
+        if (clientMode)
+            cfg.setClientMode(true);
+
+        return cfg;
+    }
+
+    /**
+     * @param latch Latch.
+     * @throws Exception If failed.
+     */
+    protected void waitReconnectEvent(CountDownLatch latch) throws Exception {
+        if (!latch.await(RECONNECT_TIMEOUT, MILLISECONDS)) {
+            log.error("Failed to wait for reconnect event, will dump threads, latch count: " + latch.getCount());
+
+            U.dumpThreads(log);
+
+            fail("Failed to wait for disconnect/reconnect event.");
+        }
+    }
+
+    /**
+     * @return Number of server nodes started before tests.
+     */
+    protected abstract int serverCount();
+
+    /**
+     * @return Number of client nodes started before tests.
+     */
+    protected int clientCount() {
+        return 0;
+    }
+
+    /**
+     * @param ignite Node.
+     * @return Discovery SPI.
+     */
+    protected TestTcpDiscoverySpi spi(Ignite ignite) {
+        return ((TestTcpDiscoverySpi)ignite.configuration().getDiscoverySpi());
+    }
+
+    /**
+     * @param ignite Node.
+     * @return Communication SPI.
+     */
+    protected BlockTpcCommunicationSpi commSpi(Ignite ignite) {
+        return ((BlockTpcCommunicationSpi)ignite.configuration().getCommunicationSpi());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        int srvs = serverCount();
+
+        if (srvs > 0)
+            startGrids(srvs);
+
+        int clients = clientCount();
+
+        if (clients > 0) {
+            clientMode = true;
+
+            startGridsMultiThreaded(srvs, clients);
+
+            clientMode = false;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @param client Client.
+     * @return Server node client connected to.
+     */
+    protected Ignite clientRouter(Ignite client) {
+        TcpDiscoveryNode node = (TcpDiscoveryNode)client.cluster().localNode();
+
+        assertTrue(node.isClient());
+        assertNotNull(node.clientRouterNodeId());
+
+        Ignite srv = G.ignite(node.clientRouterNodeId());
+
+        assertNotNull(srv);
+
+        return srv;
+    }
+
+    /**
+     * @param fut Future.
+     * @throws Exception If failed.
+     */
+    protected void assertNotDone(IgniteInternalFuture<?> fut) throws Exception {
+        assertNotNull(fut);
+
+        if (fut.isDone())
+            fail("Future completed with result: " + fut.get());
+    }
+
+    /**
+     * Reconnect client node.
+     *
+     * @param client Client.
+     * @param srv Server.
+     * @param disconnectedC Closure which will be run when client node disconnected.
+     * @throws Exception If failed.
+     */
+    protected void reconnectClientNode(Ignite client, Ignite srv, @Nullable Runnable disconnectedC)
+        throws Exception {
+        final TestTcpDiscoverySpi clientSpi = spi(client);
+        final TestTcpDiscoverySpi srvSpi = spi(srv);
+
+        final CountDownLatch disconnectLatch = new CountDownLatch(1);
+        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+        log.info("Block reconnect.");
+
+        clientSpi.writeLatch = new CountDownLatch(1);
+
+        IgnitePredicate<Event> p = new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+                    info("Disconnected: " + evt);
+
+                    disconnectLatch.countDown();
+                }
+                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    info("Reconnected: " + evt);
+
+                    reconnectLatch.countDown();
+                }
+
+                return true;
+            }
+        };
+
+        client.events().localListen(p, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+        srvSpi.failNode(client.cluster().localNode().id(), null);
+
+        waitReconnectEvent(disconnectLatch);
+
+        if (disconnectedC != null)
+            disconnectedC.run();
+
+        log.info("Allow reconnect.");
+
+        clientSpi.writeLatch.countDown();
+
+        waitReconnectEvent(reconnectLatch);
+
+        client.events().stopLocalListen(p);
+    }
+
+    /**
+     * @param e Client disconnected exception.
+     * @return Reconnect future.
+     */
+    protected IgniteFuture<?> check(CacheException e) {
+        log.info("Expected exception: " + e);
+
+        if (!(e.getCause() instanceof IgniteClientDisconnectedException))
+            log.error("Unexpected cause: " + e.getCause(), e);
+
+        assertTrue("Unexpected cause: " + e.getCause(), e.getCause() instanceof IgniteClientDisconnectedException);
+
+        IgniteClientDisconnectedException e0 = (IgniteClientDisconnectedException)e.getCause();
+
+        assertNotNull(e0.reconnectFuture());
+
+        return e0.reconnectFuture();
+    }
+
+    /**
+     * @param e Client disconnected exception.
+     */
+    protected void checkAndWait(CacheException e) {
+        check(e).get();
+    }
+
+    /**
+     * @param e Client disconnected exception.
+     */
+    protected void checkAndWait(IgniteClientDisconnectedException e) {
+        log.info("Expected exception: " + e);
+
+        assertNotNull(e.reconnectFuture());
+
+        e.reconnectFuture().get();
+    }
+
+    /**
+     *
+     */
+    protected static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+        /** */
+        volatile CountDownLatch writeLatch;
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg)
+            throws IOException, IgniteCheckedException {
+            if (msg instanceof TcpDiscoveryJoinRequestMessage) {
+                CountDownLatch writeLatch0 = writeLatch;
+
+                if (writeLatch0 != null) {
+                    log.info("Block join request send: " + msg);
+
+                    U.await(writeLatch0);
+                }
+            }
+
+            super.writeToSocket(sock, msg);
+        }
+    }
+
+    /**
+     *
+     */
+    protected static class BlockTpcCommunicationSpi extends TcpCommunicationSpi {
+        /** */
+        volatile Class msgCls;
+
+        /** */
+        AtomicBoolean collectStart = new AtomicBoolean(false);
+
+        /** */
+        ConcurrentHashMap<String, ClusterNode> classes = new ConcurrentHashMap<>();
+
+        /** */
+        @LoggerResource
+        private IgniteLogger log;
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
+            Class msgCls0 = msgCls;
+
+            if (collectStart.get() && msg instanceof GridIoMessage)
+                classes.put(((GridIoMessage)msg).message().getClass().getName(), node);
+
+            if (msgCls0 != null && msg instanceof GridIoMessage
+                && ((GridIoMessage)msg).message().getClass().equals(msgCls)) {
+                log.info("Block message: " + msg);
+
+                return;
+            }
+
+            super.sendMessage(node, msg);
+        }
+
+        /**
+         * @param clazz Class of messages which will be block.
+         */
+        public void blockMessage(Class clazz) {
+            msgCls = clazz;
+        }
+
+        /**
+         * Unlock all message.
+         */
+        public void unblockMessage() {
+            msgCls = null;
+        }
+
+        /**
+         * Start collect messages.
+         */
+        public void start() {
+            collectStart.set(true);
+        }
+
+        /**
+         * Print collected messages.
+         */
+        public void print() {
+            for (String s : classes.keySet())
+                log.error(s);
+        }
+    }
+}


Mime
View raw message