ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [1/2] incubator-ignite git commit: # ignite-901 client reconnect WIP
Date Wed, 01 Jul 2015 15:11:14 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-901 [created] f5f3efd16


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 1071ef2..5a898b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -97,17 +97,62 @@ public class GridCacheSharedContext<K, V> {
         Collection<CacheStoreSessionListener> storeSesLsnrs
     ) {
         this.kernalCtx = kernalCtx;
+
+        setManagers(txMgr, verMgr, mvccMgr, depMgr, exchMgr, ioMgr);
+
+        this.storeSesLsnrs = storeSesLsnrs;
+
+        txMetrics = new TransactionMetricsAdapter();
+
+        ctxMap = new ConcurrentHashMap<>();
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    void onDisconnected() throws IgniteCheckedException {
+        for (ListIterator<? extends GridCacheSharedManager<?, ?>> it = mgrs.listIterator(mgrs.size());
+             it.hasPrevious();) {
+            GridCacheSharedManager<?, ?> mgr = it.previous();
+
+            if (mgr.restartOnDisconnect())
+                mgr.onKernalStop(true, true);
+        }
+
+        for (ListIterator<? extends GridCacheSharedManager<?, ?>> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) {
+            GridCacheSharedManager<?, ?> mgr = it.previous();
+
+            if (mgr.restartOnDisconnect())
+                mgr.stop(true);
+        }
+
+        mgrs = new LinkedList<>();
+
+        setManagers(txMgr,
+            verMgr,
+            mvccMgr,
+            new GridCacheDeploymentManager<K, V>(),
+            new GridCachePartitionExchangeManager<K, V>(),
+            ioMgr);
+
+        for (GridCacheSharedManager<K, V> mgr : mgrs) {
+            if (mgr.restartOnDisconnect())
+                mgr.start(this);
+        }
+    }
+
+    private void setManagers(IgniteTxManager txMgr,
+        GridCacheVersionManager verMgr,
+        GridCacheMvccManager mvccMgr,
+        GridCacheDeploymentManager<K, V> depMgr,
+        GridCachePartitionExchangeManager<K, V> exchMgr,
+        GridCacheIoManager ioMgr) {
         this.mvccMgr = add(mvccMgr);
         this.verMgr = add(verMgr);
         this.txMgr = add(txMgr);
         this.depMgr = add(depMgr);
         this.exchMgr = add(exchMgr);
         this.ioMgr = add(ioMgr);
-        this.storeSesLsnrs = storeSesLsnrs;
-
-        txMetrics = new TransactionMetricsAdapter();
-
-        ctxMap = new ConcurrentHashMap<>();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java
index d45052c..5d27657 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java
@@ -46,7 +46,7 @@ public interface GridCacheSharedManager <K, V> {
     /**
      * @param cancel Cancel flag.
      */
-    public void onKernalStop(boolean cancel);
+    public void onKernalStop(boolean cancel, boolean disconnected);
 
     /**
      * Prints memory statistics (sizes of internal data structures, etc.).
@@ -54,4 +54,9 @@ public interface GridCacheSharedManager <K, V> {
      * NOTE: this method is for testing and profiling purposes only.
      */
     public void printMemoryStats();
+
+    /**
+     *
+     */
+    public boolean restartOnDisconnect();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
index 2cf7051..61dbc25 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
@@ -101,14 +101,14 @@ public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManag
     }
 
     /** {@inheritDoc} */
-    @Override public final void onKernalStop(boolean cancel) {
+    @Override public final void onKernalStop(boolean cancel, boolean disconnected) {
         if (!starting.get())
             // Ignoring attempt to stop manager that has never been started.
             return;
 
-        onKernalStop0(cancel);
+        onKernalStop0(cancel, disconnected);
 
-        if (log != null && log.isDebugEnabled())
+        if (!disconnected && log != null && log.isDebugEnabled())
             log.debug(kernalStopInfo());
     }
 
@@ -121,8 +121,9 @@ public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManag
 
     /**
      * @param cancel Cancel flag.
+     * @param disconnected Disconnected flag.
      */
-    protected void onKernalStop0(boolean cancel) {
+    protected void onKernalStop0(boolean cancel, boolean disconnected) {
         // No-op.
     }
 
@@ -160,6 +161,11 @@ public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManag
     }
 
     /** {@inheritDoc} */
+    @Override public boolean restartOnDisconnect() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridCacheSharedManagerAdapter.class, this);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 38a0d55..0369eb9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -458,6 +458,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         if (isDone())
             return;
 
+        log.info("Init exchange: " + exchangeId());
+
         if (init.compareAndSet(false, true)) {
             if (isDone())
                 return;
@@ -1024,6 +1026,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         cctx.exchange().onExchangeDone(this, err);
 
         if (super.onDone(res, err) && !dummy && !forcePreload) {
+            log.info("Finished exchange: " + exchangeId() + ", err=" + err);
+
             if (log.isDebugEnabled())
                 log.debug("Completed partition exchange [localNode=" + cctx.localNodeId() + ", exchange= " + this + ']');
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 0355bb3..969d7a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -180,13 +180,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
         topVer.setIfGreater(startTopVer);
 
-        // Generate dummy discovery event for local node joining.
-        DiscoveryEvent discoEvt = cctx.discovery().localJoinEvent();
-
-        assert discoEvt != null;
-
-        assert discoEvt.topologyVersion() == startTopVer;
-
         supplyPool.start();
         demandPool.start();
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index 351d6cd..a7e3f4b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -90,6 +90,11 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
     public abstract GridDhtCacheAdapter<K, V> dht();
 
     /** {@inheritDoc} */
+    @Override public void disconnected() {
+        map = new GridCacheConcurrentMap(ctx, ctx.config().getNearConfiguration().getNearStartSize(), 0.75F);
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean isNear() {
         return true;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index d59a51d..39f6bd5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -552,7 +552,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
         // Creates task session with task name and task version.
         GridTaskSessionImpl ses = ctx.session().createTaskSession(
             sesId,
-            ctx.config().getNodeId(),
+            ctx.localNodeId(),
             taskName,
             dep,
             taskCls == null ? null : taskCls.getName(),

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 5e557bd..dd19203 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -58,9 +58,6 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
     /** Ignite instance. */
     protected Ignite ignite;
 
-    /** Local node id. */
-    protected UUID nodeId;
-
     /** Grid instance name. */
     protected String gridName;
 
@@ -111,7 +108,7 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
 
     /** {@inheritDoc} */
     @Override public UUID getLocalNodeId() {
-        return nodeId;
+        return ignite.cluster().localNode().id();
     }
 
     /** {@inheritDoc} */
@@ -201,10 +198,8 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
     protected void injectResources(Ignite ignite) {
         this.ignite = ignite;
 
-        if (ignite != null) {
-            nodeId = ignite.configuration().getNodeId();
+        if (ignite != null)
             gridName = ignite.name();
-        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index addf243d..5eaca21 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -248,7 +248,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     if (log.isDebugEnabled())
                         log.debug("Sending local node ID to newly accepted session: " + ses);
 
-                    ses.send(nodeIdMsg);
+                    ses.send(nodeIdMessage());
                 }
             }
 
@@ -700,9 +700,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Address resolver. */
     private AddressResolver addrRslvr;
 
-    /** Local node ID message. */
-    private NodeIdMessage nodeIdMsg;
-
     /** Received messages count. */
     private final LongAdder8 rcvdMsgsCnt = new LongAdder8();
 
@@ -739,10 +736,25 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Discovery listener. */
     private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
         @Override public void onEvent(Event evt) {
-            assert evt instanceof DiscoveryEvent;
-            assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED;
+            assert evt instanceof DiscoveryEvent : evt;
+            assert evt.type() == EVT_NODE_LEFT ||
+                evt.type() == EVT_NODE_FAILED ||
+                evt.type() == EVT_CLIENT_NODE_DISCONNECTED : evt;
+
+            if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+                for (GridCommunicationClient client : clients.values())
+                    client.forceClose();
+
+                IgniteCheckedException err = new IgniteCheckedException("Failed to connect to node, " +
+                    "local node node disconnected.");
 
-            onNodeLeft(((DiscoveryEvent)evt).eventNode().id());
+                for (GridFutureAdapter<GridCommunicationClient> clientFut : clientFuts.values())
+                    clientFut.onDone(err);
+
+                recoveryDescs.clear();
+            }
+            else
+                onNodeLeft(((DiscoveryEvent)evt).eventNode().id());
         }
     };
 
@@ -1237,8 +1249,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
     /** {@inheritDoc} */
     @Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException {
-        nodeIdMsg = new NodeIdMessage(getLocalNodeId());
-
         assertParameter(locPort > 1023, "locPort > 1023");
         assertParameter(locPort <= 0xffff, "locPort < 0xffff");
         assertParameter(locPortRange >= 0, "locPortRange >= 0");
@@ -1371,7 +1381,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         if (boundTcpShmemPort > 0)
             spiCtx.registerPort(boundTcpShmemPort, IgnitePortProtocol.TCP);
 
-        spiCtx.addLocalEventListener(discoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
+        spiCtx.addLocalEventListener(discoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_CLIENT_NODE_DISCONNECTED);
 
         ctxInitLatch.countDown();
     }
@@ -1666,10 +1676,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         if (log.isTraceEnabled())
             log.trace("Sending message to node [node=" + node + ", msg=" + msg + ']');
 
-        UUID locNodeId = getLocalNodeId();
-
-        if (node.id().equals(locNodeId))
-            notifyListener(locNodeId, msg, NOOP);
+        if (node.isLocal())
+            notifyListener(node.id(), msg, NOOP);
         else {
             GridCommunicationClient client = null;
 
@@ -2208,7 +2216,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     ch.write(ByteBuffer.wrap(U.IGNITE_HEADER));
 
                     if (recovery != null) {
-                        HandshakeMessage msg = new HandshakeMessage(getLocalNodeId(),
+                        HandshakeMessage msg = new HandshakeMessage(getSpiContext().localNode().id(),
                             recovery.incrementConnectCount(),
                             recovery.receivedCount());
 
@@ -2228,7 +2236,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         ch.write(buf);
                     }
                     else
-                        ch.write(ByteBuffer.wrap(nodeIdMsg.nodeIdBytesWithType));
+                        ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType));
 
                     if (recovery != null) {
                         if (log.isDebugEnabled())
@@ -2355,6 +2363,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         getExceptionRegistry().onException(msg, e);
     }
 
+    /**
+     * @return Node ID message.
+     */
+    private NodeIdMessage nodeIdMessage() {
+        return new NodeIdMessage(getSpiContext().localNode().id());
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(TcpCommunicationSpi.class, this);
@@ -2860,15 +2875,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             }
 
             try {
+                UUID id = getSpiContext().localNode().id();
+
+                NodeIdMessage msg = new NodeIdMessage(id);
+
                 out.write(U.IGNITE_HEADER);
                 out.write(NODE_ID_MSG_TYPE);
-                out.write(nodeIdMsg.nodeIdBytes);
+                out.write(msg.nodeIdBytes);
 
                 out.flush();
 
                 if (log.isDebugEnabled())
-                    log.debug("Sent local node ID [locNodeId=" + getLocalNodeId() + ", rmtNodeId="
-                        + rmtNodeId + ']');
+                    log.debug("Sent local node ID [locNodeId=" + id + ", rmtNodeId=" + rmtNodeId + ']');
             }
             catch (IOException e) {
                 throw new IgniteCheckedException("Failed to perform handshake.", e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 04276d2..4b1cfa7 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -70,9 +70,6 @@ class ClientImpl extends TcpDiscoveryImpl {
     /** */
     private SocketReader sockReader;
 
-    /** */
-    private boolean segmented;
-
     /** Last message ID. */
     private volatile IgniteUuid lastMsgId;
 
@@ -325,7 +322,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
     /** {@inheritDoc} */
     @Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) {
-        if (segmented)
+        if (state == State.SEGMENTED)
             throw new IgniteException("Failed to send custom message: client is disconnected");
 
         try {
@@ -339,7 +336,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
     /** {@inheritDoc} */
     @Override public void failNode(UUID nodeId, @Nullable String warning) {
-        ClusterNode node = rmtNodes.get(nodeId);
+        TcpDiscoveryNode node = rmtNodes.get(nodeId);
 
         if (node != null) {
             TcpDiscoveryNodeFailedMessage msg = new TcpDiscoveryNodeFailedMessage(getLocalNodeId(),
@@ -366,7 +363,8 @@ class ClientImpl extends TcpDiscoveryImpl {
         long startTime = U.currentTimeMillis();
 
         // Marshal credentials for backward compatibility and security.
-        marshalCredentials(locNode);
+        if (!recon)
+            marshalCredentials(locNode);
 
         while (true) {
             if (Thread.currentThread().isInterrupted())
@@ -947,7 +945,10 @@ class ClientImpl extends TcpDiscoveryImpl {
 
         /** {@inheritDoc} */
         @Override protected void body() throws InterruptedException {
-            assert !segmented;
+            assert state == ClientImpl.State.DISCONNECTED
+                || state == ClientImpl.State.CONNECTED
+                || state == ClientImpl.State.STARTING :
+                state;
 
             boolean success = false;
 
@@ -1007,9 +1008,11 @@ class ClientImpl extends TcpDiscoveryImpl {
                                         }
 
                                         success = true;
-                                    }
 
-                                    return;
+                                        return;
+                                    }
+                                    else // TODO IGNITE-901 reuse socket.
+                                        return;
                                 }
                             }
                             else if (spi.ensured(msg)) {
@@ -1090,45 +1093,33 @@ class ClientImpl extends TcpDiscoveryImpl {
         /** {@inheritDoc} */
         @SuppressWarnings("InfiniteLoopStatement")
         @Override protected void body() throws InterruptedException {
+            state = ClientImpl.State.STARTING;
+
             spi.stats.onJoinStarted();
 
             try {
-                final Socket sock = joinTopology(false, spi.joinTimeout);
-
-                if (sock == null) {
-                    joinError(new IgniteSpiException("Join process timed out."));
-
-                    return;
-                }
-
-                currSock = sock;
-
-                sockWriter.setSocket(sock);
-
-                if (spi.joinTimeout > 0) {
-                    timer.schedule(new TimerTask() {
-                        @Override public void run() {
-                            if (joinLatch.getCount() > 0)
-                                queue.add(JOIN_TIMEOUT);
-                        }
-                    }, spi.joinTimeout);
-                }
-
-                sockReader.setSocket(sock, locNode.clientRouterNodeId());
+                tryJoin();
 
                 while (true) {
                     Object msg = queue.take();
 
                     if (msg == JOIN_TIMEOUT) {
-                        if (joinLatch.getCount() > 0) {
+                        if (state == ClientImpl.State.STARTING) {
                             joinError(new IgniteSpiException("Join process timed out, did not receive response for " +
                                 "join request (consider increasing 'joinTimeout' configuration property) " +
-                                "[joinTimeout=" + spi.joinTimeout + ", sock=" + sock + ']'));
+                                "[joinTimeout=" + spi.joinTimeout + ", sock=" + currSock + ']'));
 
                             break;
                         }
+                        else if (state == ClientImpl.State.DISCONNECTED) {
+                            state = ClientImpl.State.SEGMENTED;
+
+                            notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
+                        }
                     }
                     else if (msg == SPI_STOP) {
+                        state = ClientImpl.State.STOPPED;
+
                         assert spi.getSpiContext().isStopping();
 
                         if (currSock != null) {
@@ -1147,7 +1138,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                             boolean join = joinLatch.getCount() > 0;
 
-                            if (spi.getSpiContext().isStopping() || segmented) {
+                            if (spi.getSpiContext().isStopping() || (state == ClientImpl.State.SEGMENTED)) {
                                 leaveLatch.countDown();
 
                                 if (join) {
@@ -1166,19 +1157,31 @@ class ClientImpl extends TcpDiscoveryImpl {
                         }
                     }
                     else if (msg == SPI_RECONNECT_FAILED) {
-                        if (!segmented) {
-                            segmented = true;
+                        reconnector.cancel();
+                        reconnector.join();
 
-                            reconnector.cancel();
-                            reconnector.join();
+                        reconnector = null;
 
-                            notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
-                        }
+                        state = ClientImpl.State.DISCONNECTED;
+
+                        nodeAdded = false;
+
+                        notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes());
+
+                        UUID newId = UUID.randomUUID();
+
+                        log.info("Change node id: " + newId);
+
+                        rmtNodes.clear();
+
+                        locNode.onClientDisconnected(newId);
+
+                        tryJoin();
                     }
                     else {
                         TcpDiscoveryAbstractMessage discoMsg = (TcpDiscoveryAbstractMessage)msg;
 
-                        if (joinLatch.getCount() > 0) {
+                        if (joinLatch.getCount() > 0) { // TODO IGNITE-901.
                             IgniteSpiException err = null;
 
                             if (discoMsg instanceof TcpDiscoveryDuplicateIdMessage)
@@ -1214,6 +1217,44 @@ class ClientImpl extends TcpDiscoveryImpl {
         }
 
         /**
+         * @throws InterruptedException If interrupted.
+         */
+        private void tryJoin() throws InterruptedException {
+            assert state == ClientImpl.State.DISCONNECTED || state == ClientImpl.State.STARTING : state;
+
+            boolean join = state == ClientImpl.State.STARTING;
+
+            final Socket sock = joinTopology(false, spi.joinTimeout);
+
+            if (sock == null) {
+                if (join)
+                    joinError(new IgniteSpiException("Join process timed out."));
+                else {
+                    state = ClientImpl.State.SEGMENTED;
+
+                    notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
+                }
+
+                return;
+            }
+
+            currSock = sock;
+
+            sockWriter.setSocket(sock);
+
+            if (spi.joinTimeout > 0) {
+                timer.schedule(new TimerTask() {
+                    @Override public void run() {
+                        if (joinLatch.getCount() > 0)
+                            queue.add(JOIN_TIMEOUT);
+                    }
+                }, spi.joinTimeout);
+            }
+
+            sockReader.setSocket(sock, locNode.clientRouterNodeId());
+        }
+
+        /**
          * @param msg Message.
          */
         protected void processDiscoveryMessage(TcpDiscoveryAbstractMessage msg) {
@@ -1244,6 +1285,16 @@ class ClientImpl extends TcpDiscoveryImpl {
             spi.stats.onMessageProcessingFinished(msg);
         }
 
+        private boolean nodeAdded;
+
+        private boolean joining() {
+            return state == ClientImpl.State.STARTING || state == ClientImpl.State.DISCONNECTED;
+        }
+
+        private boolean disconnected() {
+            return state == ClientImpl.State.DISCONNECTED;
+        }
+
         /**
          * @param msg Message.
          */
@@ -1256,12 +1307,15 @@ class ClientImpl extends TcpDiscoveryImpl {
             UUID newNodeId = node.id();
 
             if (getLocalNodeId().equals(newNodeId)) {
-                if (joinLatch.getCount() > 0) {
+                if (joining()) {
                     Collection<TcpDiscoveryNode> top = msg.topology();
 
                     if (top != null) {
                         spi.gridStartTime = msg.gridStartTime();
 
+                        if (disconnected())
+                            rmtNodes.clear();
+
                         for (TcpDiscoveryNode n : top) {
                             if (n.order() > 0)
                                 n.visible(true);
@@ -1271,6 +1325,8 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                         topHist.clear();
 
+                        nodeAdded = true;
+
                         if (msg.topologyHistory() != null)
                             topHist.putAll(msg.topologyHistory());
                     }
@@ -1308,7 +1364,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                 return;
 
             if (getLocalNodeId().equals(msg.nodeId())) {
-                if (joinLatch.getCount() > 0) {
+                if (joining()) {
                     Map<UUID, Map<Integer, byte[]>> dataMap = msg.clientDiscoData();
 
                     if (dataMap != null) {
@@ -1323,7 +1379,14 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                     locNode.order(topVer);
 
-                    notifyDiscovery(EVT_NODE_JOINED, topVer, locNode, updateTopologyHistory(topVer, msg));
+                    Collection<ClusterNode> nodes = updateTopologyHistory(topVer, msg);
+
+                    if (disconnected())
+                        notifyDiscovery(EVT_CLIENT_NODE_RECONNECTED, topVer, locNode, nodes);
+
+                    notifyDiscovery(EVT_NODE_JOINED, topVer, locNode, nodes);
+
+                    state = ClientImpl.State.CONNECTED;
 
                     joinErr.set(null);;
 
@@ -1437,7 +1500,7 @@ class ClientImpl extends TcpDiscoveryImpl {
          * @return {@code True} if received node added message for local node.
          */
         private boolean nodeAdded() {
-            return !topHist.isEmpty();
+            return nodeAdded;
         }
 
         /**
@@ -1582,7 +1645,7 @@ class ClientImpl extends TcpDiscoveryImpl {
          * @param msg Message.
          */
         private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
-            if (msg.verified() && joinLatch.getCount() == 0) {
+            if (msg.verified() && state == ClientImpl.State.CONNECTED) {
                 DiscoverySpiListener lsnr = spi.lsnr;
 
                 if (lsnr != null) {
@@ -1718,4 +1781,18 @@ class ClientImpl extends TcpDiscoveryImpl {
             this.sock = sock;
         }
     }
+
+    private volatile State state;
+
+    private enum State {
+        STARTING,
+
+        CONNECTED,
+
+        DISCONNECTED,
+
+        SEGMENTED,
+
+        STOPPED
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index fa3e564..05f710d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -585,11 +585,11 @@ class ServerImpl extends TcpDiscoveryImpl {
 
     /** {@inheritDoc} */
     @Override public void failNode(UUID nodeId, @Nullable String warning) {
-        ClusterNode node = ring.node(nodeId);
+        TcpDiscoveryNode node = ring.node(nodeId);
 
         if (node != null) {
             TcpDiscoveryNodeFailedMessage msg = new TcpDiscoveryNodeFailedMessage(getLocalNodeId(),
-                node.id(), node.order());
+                node.id(), node.internalOrder());
 
             msg.warning(warning);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index ace917f..4cb0b8d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -112,7 +112,7 @@ abstract class TcpDiscoveryImpl {
      * @return Local node ID.
      */
     public UUID getLocalNodeId() {
-        return spi.getLocalNodeId();
+        return spi.locNode.id();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 7663fe6..9446d2d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -844,7 +844,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
         }
 
         locNode = new TcpDiscoveryNode(
-            getLocalNodeId(),
+            ignite.configuration().getNodeId(),
             addrs.get1(),
             addrs.get2(),
             srvPort,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
index 36ae39e..d0c3edf 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
@@ -431,12 +431,29 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste
     }
 
     /**
+     * @return Metrics provider.
+     */
+    public DiscoveryMetricsProvider metricsProvider() {
+        return metricsProvider;
+    }
+
+    /**
      * @param clientRouterNodeId Client router node ID.
      */
     public void clientRouterNodeId(UUID clientRouterNodeId) {
         this.clientRouterNodeId = clientRouterNodeId;
     }
 
+    /**
+     * @param newId New node ID.
+     */
+    public void onClientDisconnected(UUID newId) {
+        id = newId;
+        order = 0;
+        intOrder = 0;
+        visible = false;
+    }
+
     /** {@inheritDoc} */
     @Override public int compareTo(@Nullable TcpDiscoveryNode node) {
         if (node == null)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
index 7a88426..000782a 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
@@ -257,7 +257,7 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi,
 
         registerMBean(gridName, this, FileSwapSpaceSpiMBean.class);
 
-        String path = baseDir + File.separator + gridName + File.separator + getLocalNodeId();
+        String path = baseDir + File.separator + gridName + File.separator + ignite.configuration().getNodeId();
 
         try {
             dir = U.resolveWorkDirectory(path, true);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java
index abc9109..5f2d2b4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java
@@ -104,8 +104,6 @@ public class GridUpdateNotifierSelfTest extends GridCommonAbstractTest {
      * Test kernal gateway that always return uninitialized user stack trace.
      */
     private static final GridKernalGateway TEST_GATEWAY = new GridKernalGateway() {
-        @Override public void lightCheck() throws IllegalStateException {}
-
         @Override public void readLock() throws IllegalStateException {}
 
         @Override public void readLockAnyway() {}
@@ -122,10 +120,6 @@ public class GridUpdateNotifierSelfTest extends GridCommonAbstractTest {
 
         @Override public void writeUnlock() {}
 
-        @Override public void addStopListener(Runnable lsnr) {}
-
-        @Override public void removeStopListener(Runnable lsnr) {}
-
         @Override public String userStackTrace() {
             return null;
         }
@@ -133,5 +127,13 @@ public class GridUpdateNotifierSelfTest extends GridCommonAbstractTest {
         @Override public boolean tryWriteLock(long timeout) {
             return false;
         }
+
+        @Override public void onDisconnected() {
+            // No-op.
+        }
+
+        @Override public void onReconnected() {
+            // No-op.
+        }
     };
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
new file mode 100644
index 0000000..0512074
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.internal.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.spi.discovery.tcp.messages.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.concurrent.*;
+
+/**
+ *
+ */
+public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    protected boolean clientMode;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TestTcpDiscoverySpi disco = new TestTcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+        disco.setJoinTimeout(2 * 60_000);
+
+        cfg.setDiscoverySpi(disco);
+
+        if (clientMode)
+            cfg.setClientMode(true);
+
+        return cfg;
+    }
+
+    /**
+     * @return Number of server nodes started before tests.
+     */
+    protected abstract int serverCount();
+
+    /**
+     * @param ignite Node.
+     * @return Discovery SPI.
+     */
+    protected TestTcpDiscoverySpi spi(Ignite ignite) {
+        return ((TestTcpDiscoverySpi)ignite.configuration().getDiscoverySpi());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        int srvs = serverCount();
+
+        if (srvs > 0)
+            startGrids(srvs);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @param client Client.
+     * @return Server node client connected to.
+     */
+    protected Ignite clientRouter(Ignite client) {
+        TcpDiscoveryNode node = (TcpDiscoveryNode)client.cluster().localNode();
+
+        assertTrue(node.isClient());
+        assertNotNull(node.clientRouterNodeId());
+
+        Ignite srv = G.ignite(node.clientRouterNodeId());
+
+        assertNotNull(srv);
+
+        return srv;
+    }
+
+    /**
+     * @param fut Future.
+     * @throws Exception If failed.
+     */
+    protected void assertNotDone(IgniteInternalFuture<?> fut) throws Exception {
+        assertNotNull(fut);
+
+        if (fut.isDone())
+            fail("Future completed with result: " + fut.get());
+    }
+
+    /**
+     *
+     */
+    protected static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+        /** */
+        volatile CountDownLatch writeLatch;
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg)
+            throws IOException, IgniteCheckedException {
+            if (msg instanceof TcpDiscoveryJoinRequestMessage) {
+                CountDownLatch writeLatch0 = writeLatch;
+
+                if (writeLatch0 != null) {
+                    log.info("Block join request send: " + msg);
+
+                    U.await(writeLatch0);
+                }
+            }
+
+            super.writeToSocket(sock, msg);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiBlockTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiBlockTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiBlockTest.java
new file mode 100644
index 0000000..164f6c8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiBlockTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.testframework.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectApiBlockTest extends IgniteClientReconnectAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setCacheConfiguration(new CacheConfiguration());
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int serverCount() {
+        return 1;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("unchecked")
+    public void testIgniteBlockOnDisconnect() throws Exception {
+        clientMode = true;
+
+        final Ignite client = startGrid(serverCount());
+
+        assertNotNull(client.cache(null));
+
+        final TestTcpDiscoverySpi clientSpi = spi(client);
+
+        Ignite srv = clientRouter(client);
+
+        TestTcpDiscoverySpi srvSpi = spi(srv);
+
+        final CountDownLatch disconnectLatch = new CountDownLatch(1);
+
+        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+        log.info("Block reconnect.");
+
+        clientSpi.writeLatch = new CountDownLatch(1);
+
+        final List<IgniteInternalFuture> futs = new ArrayList<>();
+
+        // TODO IGNITE-901 test block for others public API.
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+                    info("Disconnected: " + evt);
+
+                    assertEquals(1, reconnectLatch.getCount());
+
+                    futs.add(GridTestUtils.runAsync(new Callable() {
+                        @Override public Object call() throws Exception {
+                            return client.transactions();
+                        }
+                    }));
+
+                    futs.add(GridTestUtils.runAsync(new Callable() {
+                        @Override public Object call() throws Exception {
+                            return client.cache(null);
+                        }
+                    }));
+
+                    futs.add(GridTestUtils.runAsync(new Callable() {
+                        @Override public Object call() throws Exception {
+                            return client.dataStreamer(null);
+                        }
+                    }));
+
+                    disconnectLatch.countDown();
+                } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    info("Reconnected: " + evt);
+
+                    reconnectLatch.countDown();
+                }
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+        log.info("Fail client.");
+
+        srvSpi.failNode(client.cluster().localNode().id(), null);
+
+        assertTrue(disconnectLatch.await(5000, TimeUnit.MILLISECONDS));
+
+        assertEquals(3, futs.size());
+
+        for (IgniteInternalFuture<?> fut : futs)
+            assertNotDone(fut);
+
+        U.sleep(2000);
+
+        for (IgniteInternalFuture<?> fut : futs)
+            assertNotDone(fut);
+
+        log.info("Allow reconnect.");
+
+        clientSpi.writeLatch.countDown();
+
+        assertTrue(reconnectLatch.await(5000, TimeUnit.MILLISECONDS));
+
+        IgniteTransactions txs = (IgniteTransactions)futs.get(0).get();
+
+        assertNotNull(txs);
+
+        IgniteCache<Object, Object> cache0 = (IgniteCache<Object, Object>)futs.get(1).get();
+
+        assertNotNull(cache0);
+
+        cache0.put(1, 1);
+
+        assertEquals(1, cache0.get(1));
+
+        IgniteDataStreamer<Object, Object> streamer = (IgniteDataStreamer<Object, Object>)futs.get(2).get();
+
+        streamer.addData(2, 2);
+
+        streamer.close();
+
+        assertEquals(2, cache0.get(2));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
new file mode 100644
index 0000000..5687010
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.testframework.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstractTest {
+    /** */
+    private final int SRV_CNT = 1;
+
+    /** */
+    private UUID nodeId;
+
+    /** */
+    private Map<IgnitePredicate<? extends Event>, int[]> lsnrs;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setCommunicationSpi(new TestCommunicationSpi());
+
+        cfg.setPeerClassLoadingEnabled(false);
+
+        if (nodeId != null) {
+            cfg.setNodeId(nodeId);
+
+            nodeId = null;
+        }
+
+        if (lsnrs != null) {
+            cfg.setLocalEventListeners(lsnrs);
+
+            lsnrs = null;
+        }
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int serverCount() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        startGrids(SRV_CNT);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnect() throws Exception {
+        clientMode = true;
+
+        Ignite client = startGrid(SRV_CNT);
+
+        final TestTcpDiscoverySpi clientSpi = spi(client);
+
+        Ignite srv = clientRouter(client);
+
+        TestTcpDiscoverySpi srvSpi = spi(srv);
+
+        final IgniteCache<Object, Object> cache = client.getOrCreateCache(new CacheConfiguration<>());
+
+        cache.put(1, 1);
+
+        final CountDownLatch disconnectLatch = new CountDownLatch(1);
+
+        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+        log.info("Block reconnect.");
+
+        clientSpi.writeLatch = new CountDownLatch(1);
+
+        final AtomicReference<IgniteInternalFuture> blockPutRef = new AtomicReference<>();
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+                    info("Disconnected: " + evt);
+
+                    assertEquals(1, reconnectLatch.getCount());
+
+                    blockPutRef.set(GridTestUtils.runAsync(new Callable() {
+                        @Override
+                        public Object call() throws Exception {
+                            log.info("Start put.");
+
+                            cache.put(2, 2);
+
+                            log.info("Finish put.");
+
+                            return null;
+                        }
+                    }));
+
+                    disconnectLatch.countDown();
+                } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    info("Reconnected: " + evt);
+
+                    assertEquals(0, disconnectLatch.getCount());
+
+                    reconnectLatch.countDown();
+                }
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+        log.info("Fail client.");
+
+        srvSpi.failNode(client.cluster().localNode().id(), null);
+
+        assertTrue(disconnectLatch.await(5000, TimeUnit.MILLISECONDS));
+
+        IgniteInternalFuture putFut = blockPutRef.get();
+
+        assertNotDone(putFut);
+
+        U.sleep(5000);
+
+        assertNotDone(putFut);
+
+        log.info("Allow reconnect.");
+
+        clientSpi.writeLatch.countDown();
+
+        assertTrue(reconnectLatch.await(5000, TimeUnit.MILLISECONDS));
+
+        assertEquals(1, cache.get(1));
+
+        putFut.get();
+
+        assertEquals(2, cache.get(2));
+
+        cache.put(3, 3);
+
+        assertEquals(3, cache.get(3));
+
+        this.clientMode = false;
+
+        IgniteEx srv2 = startGrid(SRV_CNT + 1);
+
+        Integer key = primaryKey(srv2.cache(null));
+
+        cache.put(key, 4);
+
+        assertEquals(4, cache.get(key));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectExchangeInProgress() throws Exception {
+        clientMode = true;
+
+        IgniteEx client = startGrid(SRV_CNT);
+
+        Ignite srv = clientRouter(client);
+
+        TestTcpDiscoverySpi srvSpi = spi(srv);
+
+        TestCommunicationSpi srvCommSpi = (TestCommunicationSpi)srv.configuration().getCommunicationSpi();
+
+        srvCommSpi.blockMessages(GridDhtPartitionsFullMessage.class, client.localNode().id());
+
+        clientMode = false;
+
+        startGrid(SRV_CNT + 1);
+
+        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override
+            public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    info("Reconnected: " + evt);
+
+                    reconnectLatch.countDown();
+                }
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_RECONNECTED);
+
+        srvSpi.failNode(client.cluster().localNode().id(), null);
+
+        assertTrue(reconnectLatch.await(5000, TimeUnit.MILLISECONDS));
+
+        try {
+            srvCommSpi.stopBlock(true);
+
+            fail();
+        }
+        catch (IgniteException e) {
+            log.info("Expected error: " + e);
+        }
+
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setName("newCache");
+
+        ccfg.setCacheMode(REPLICATED);
+
+        log.info("Start new cache.");
+
+        IgniteCache<Object, Object> cache = client.getOrCreateCache(ccfg);
+
+        cache.put(1, 1);
+
+        assertEquals(1, cache.get(1));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectInitialExchangeInProgress() throws Exception {
+        final UUID clientId = UUID.randomUUID();
+
+        Ignite srv = grid(0);
+
+        final CountDownLatch joinLatch = new CountDownLatch(1);
+
+        srv.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_NODE_JOINED && ((DiscoveryEvent)evt).eventNode().id().equals(clientId)) {
+                    info("Client joined: " + evt);
+
+                    joinLatch.countDown();
+                }
+
+                return true;
+            }
+        }, EVT_NODE_JOINED);
+
+        TestCommunicationSpi srvCommSpi = (TestCommunicationSpi)srv.configuration().getCommunicationSpi();
+
+        srvCommSpi.blockMessages(GridDhtPartitionsFullMessage.class, clientId);
+
+        clientMode = true;
+
+        nodeId = clientId;
+
+        lsnrs = new HashMap<>();
+
+        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+        lsnrs.put(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    info("Reconnected: " + evt);
+
+                    reconnectLatch.countDown();
+                }
+
+                return true;
+            }
+        }, new int[]{EVT_CLIENT_NODE_RECONNECTED});
+
+        IgniteInternalFuture<Ignite> fut = GridTestUtils.runAsync(new Callable<Ignite>() {
+            @Override
+            public Ignite call() throws Exception {
+                try {
+                    return startGrid(SRV_CNT);
+                } catch (Throwable e) {
+                    log.error("Unexpected error: " + e, e);
+
+                    throw e;
+                }
+            }
+        });
+
+        TestTcpDiscoverySpi srvSpi = spi(srv);
+
+        assertTrue(joinLatch.await(5000, TimeUnit.MILLISECONDS));
+
+        U.sleep(1000);
+
+        assertNotDone(fut);
+
+        srvSpi.failNode(clientId, null);
+
+        log.info("Wait reconnect.");
+
+        assertTrue(reconnectLatch.await(10 * 60_000, TimeUnit.MILLISECONDS));
+
+        try {
+            srvCommSpi.stopBlock(true);
+
+            fail();
+        }
+        catch (IgniteException e) {
+            log.info("Expected error: " + e);
+        }
+
+        Ignite client = fut.get();
+
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setName("newCache");
+
+        ccfg.setCacheMode(REPLICATED);
+
+        log.info("Start new cache.");
+
+        IgniteCache<Object, Object> cache = client.getOrCreateCache(ccfg);
+
+        cache.put(1, 1);
+
+        assertEquals(1, cache.get(1));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectOperationInProgress() throws Exception {
+        clientMode = true;
+
+        IgniteEx client = startGrid(SRV_CNT);
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED)
+                    info("Client disconnected: " + evt);
+                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED)
+                    info("Client reconnected: " + evt);
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+        IgniteInClosure<IgniteCache<Object, Object>> putOp = new CI1<IgniteCache<Object, Object>>() {
+            @Override public void apply(IgniteCache<Object, Object> cache) {
+                cache.put(1, 1);
+            }
+        };
+
+        IgniteInClosure<IgniteCache<Object, Object>> getOp = new CI1<IgniteCache<Object, Object>>() {
+            @Override public void apply(IgniteCache<Object, Object> cache) {
+                cache.get(1);
+            }
+        };
+
+        int cnt = 0;
+
+        for (CacheAtomicityMode atomicityMode : CacheAtomicityMode.values()) {
+            CacheAtomicWriteOrderMode[] writeOrders =
+                atomicityMode == CacheAtomicityMode.ATOMIC ? CacheAtomicWriteOrderMode.values() :
+                new CacheAtomicWriteOrderMode[]{CacheAtomicWriteOrderMode.CLOCK};
+
+            for (CacheAtomicWriteOrderMode writeOrder : writeOrders) {
+                for (CacheWriteSynchronizationMode syncMode : CacheWriteSynchronizationMode.values()) {
+                    CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+                    ccfg.setAtomicityMode(atomicityMode);
+
+                    ccfg.setAtomicWriteOrderMode(writeOrder);
+
+                    ccfg.setName("cache-" + cnt++);
+
+                    ccfg.setWriteSynchronizationMode(syncMode);
+
+                    if (syncMode != CacheWriteSynchronizationMode.FULL_ASYNC) {
+                        Class<?> cls = (ccfg.getAtomicityMode() == CacheAtomicityMode.ATOMIC) ?
+                            GridNearAtomicUpdateResponse.class : GridNearTxPrepareResponse.class;
+
+                        log.info("Test cache put [atomicity=" + atomicityMode +
+                            ", writeOrder=" + writeOrder +
+                            ", syncMode=" + syncMode + ']');
+
+                        checkOperationInProgressFails(client, ccfg, cls, putOp);
+                    }
+
+                    log.info("Test cache get [atomicity=" + atomicityMode + ", syncMode=" + syncMode + ']');
+
+                    checkOperationInProgressFails(client, ccfg, GridNearGetResponse.class, getOp);
+                }
+            }
+        }
+    }
+
+    /**
+     * @param client Client.
+     * @param ccfg Cache configuration.
+     * @param msgToBlock Message to block.
+     * @param c Cache operation closure.
+     * @throws Exception If failed.
+     */
+    private void checkOperationInProgressFails(IgniteEx client,
+        final CacheConfiguration<Object, Object> ccfg,
+        Class<?> msgToBlock,
+        final IgniteInClosure<IgniteCache<Object, Object>> c)
+        throws Exception
+    {
+        Ignite srv = clientRouter(client);
+
+        TestTcpDiscoverySpi srvSpi = spi(srv);
+
+        final IgniteCache<Object, Object> cache = client.getOrCreateCache(ccfg);
+
+        TestCommunicationSpi srvCommSpi = (TestCommunicationSpi)srv.configuration().getCommunicationSpi();
+
+        srvCommSpi.blockMessages(msgToBlock, client.localNode().id());
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                c.apply(cache);
+
+                return null;
+            }
+        });
+
+        Thread.sleep(1000);
+
+        assertNotDone(fut);
+
+        log.info("Fail client: " + client.localNode().id());
+
+        srvSpi.failNode(client.localNode().id(), null);
+
+        try {
+            fut.get();
+
+            fail();
+        }
+        catch (IgniteCheckedException e) {
+            log.info("Expected error: " + e);
+        }
+
+        srvCommSpi.stopBlock(false);
+
+        cache.put(1, 1);
+
+        assertEquals(1, cache.get(1));
+
+        client.destroyCache(cache.getName());
+    }
+
+    /**
+     *
+     */
+    private static class TestCommunicationSpi extends TcpCommunicationSpi {
+        /** */
+        @LoggerResource
+        private IgniteLogger log;
+
+        /** */
+        private List<T2<ClusterNode, GridIoMessage>> blockedMsgs = new ArrayList<>();
+
+        /** */
+        private Map<Class<?>, Set<UUID>> blockCls = new HashMap<>();
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
+            if (msg instanceof GridIoMessage) {
+                Object msg0 = ((GridIoMessage)msg).message();
+
+                synchronized (this) {
+                    Set<UUID> blockNodes = blockCls.get(msg0.getClass());
+
+                    if (F.contains(blockNodes, node.id())) {
+                        log.info("Block message [node=" + node.attribute(IgniteNodeAttributes.ATTR_GRID_NAME) +
+                            ", msg=" + msg0 + ']');
+
+                        blockedMsgs.add(new T2<>(node, (GridIoMessage)msg));
+
+                        return;
+                    }
+                }
+            }
+
+            super.sendMessage(node, msg);
+        }
+
+        /**
+         * @param cls Message class.
+         * @param nodeId Node ID.
+         */
+        void blockMessages(Class<?> cls, UUID nodeId) {
+            synchronized (this) {
+                Set<UUID> set = blockCls.get(cls);
+
+                if (set == null) {
+                    set = new HashSet<>();
+
+                    blockCls.put(cls, set);
+                }
+
+                set.add(nodeId);
+            }
+        }
+
+        /**
+         * @param snd Send messages flag.
+         */
+        void stopBlock(boolean snd) {
+            synchronized (this) {
+                blockCls.clear();
+
+                if (snd) {
+                    for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs) {
+                        ClusterNode node = msg.get1();
+
+                        log.info("Send blocked message: [node=" + node.attribute(IgniteNodeAttributes.ATTR_GRID_NAME) +
+                            ", msg=" + msg.get2().message() + ']');
+
+                        super.sendMessage(msg.get1(), msg.get2());
+                    }
+                }
+
+                blockedMsgs.clear();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java
new file mode 100644
index 0000000..deffd42
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.lang.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectDiscoveryStateTest extends IgniteClientReconnectAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected int serverCount() {
+        return 3;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnect() throws Exception {
+        clientMode = true;
+
+        final Ignite client = startGrid(serverCount());
+
+        long topVer = 4;
+
+        IgniteCluster cluster = client.cluster();
+
+        cluster.nodeLocalMap().put("locMapKey", 10);
+
+        Map<Integer, Integer> nodeCnt = new HashMap<>();
+
+        nodeCnt.put(1, 1);
+        nodeCnt.put(2, 2);
+        nodeCnt.put(3, 3);
+        nodeCnt.put(4, 4);
+
+        for (Map.Entry<Integer, Integer> e : nodeCnt.entrySet()) {
+            Collection<ClusterNode> nodes = cluster.topology(e.getKey());
+
+            assertEquals((int)e.getValue(), nodes.size());
+        }
+
+        ClusterNode locNode = cluster.localNode();
+
+        assertEquals(topVer, locNode.order());
+
+        TestTcpDiscoverySpi srvSpi = spi(clientRouter(client));
+
+        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED)
+                    info("Disconnected: " + evt);
+                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    info("Reconnected: " + evt);
+
+                    reconnectLatch.countDown();
+                }
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+        srvSpi.failNode(client.cluster().localNode().id(), null);
+
+        assertTrue(reconnectLatch.await(5000, TimeUnit.MILLISECONDS));
+
+        topVer += 2; // Client failed and rejoined.
+
+        locNode = cluster.localNode();
+
+        assertEquals(topVer, locNode.order());
+        assertEquals(topVer, cluster.topologyVersion());
+
+        nodeCnt.put(5, 3);
+        nodeCnt.put(6, 4);
+
+        for (Map.Entry<Integer, Integer> e : nodeCnt.entrySet()) {
+            Collection<ClusterNode> nodes = cluster.topology(e.getKey());
+
+            assertEquals((int)e.getValue(), nodes.size());
+        }
+
+        assertEquals(10, cluster.nodeLocalMap().get("locMapKey"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedInvalidateSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedInvalidateSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedInvalidateSelfTest.java
index 19e40bf..7a2e8b3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedInvalidateSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedInvalidateSelfTest.java
@@ -220,7 +220,8 @@ public class GridCacheReplicatedInvalidateSelfTest extends GridCommonAbstractTes
             Object msg0 = ((GridIoMessage)msg).message();
 
             if (!(msg0 instanceof GridClockDeltaSnapshotMessage)) {
-                info("Sending message [locNodeId=" + getLocalNodeId() + ", destNodeId= " + destNode.id()
+                info("Sending message [locNodeId=" + ignite.cluster().localNode().id() +
+                    ", destNodeId= " + destNode.id()
                     + ", msg=" + msg + ']');
 
                 synchronized (msgCntMap) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index ec6a526..55fae9b 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -386,11 +386,11 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
         final CountDownLatch latch = new CountDownLatch(1);
 
         ((TcpDiscoverySpi)srv1.configuration().getDiscoverySpi()).addIncomeConnectionListener(new IgniteInClosure<Socket>() {
-            @Override public void apply(Socket sock) {
+            @Override
+            public void apply(Socket sock) {
                 try {
                     latch.await();
-                }
-                catch (InterruptedException e) {
+                } catch (InterruptedException e) {
                     throw new RuntimeException(e);
                 }
             }
@@ -744,11 +744,11 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
         attachListeners(1, 1);
 
         ((TcpDiscoverySpi)G.ignite("server-1").configuration().getDiscoverySpi()).addSendMessageListener(new IgniteInClosure<TcpDiscoveryAbstractMessage>() {
-            @Override public void apply(TcpDiscoveryAbstractMessage msg) {
+            @Override
+            public void apply(TcpDiscoveryAbstractMessage msg) {
                 try {
                     Thread.sleep(1000000);
-                }
-                catch (InterruptedException ignored) {
+                } catch (InterruptedException ignored) {
                     Thread.interrupted();
                 }
             }
@@ -778,7 +778,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
         G.ignite("client-0").compute().broadcast(F.noop());
 
         assertTrue(GridTestUtils.waitForCondition(new PA() {
-            @Override public boolean apply() {
+            @Override
+            public boolean apply() {
                 return checkMetrics(3, 3, 1);
             }
         }, 10000));
@@ -788,7 +789,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
         G.ignite("server-0").compute().broadcast(F.noop());
 
         assertTrue(GridTestUtils.waitForCondition(new PA() {
-            @Override public boolean apply() {
+            @Override
+            public boolean apply() {
                 return checkMetrics(3, 3, 2);
             }
         }, 10000));
@@ -886,7 +888,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
         try {
             startClientNodes(1);
 
-            assertEquals(G.ignite("server-0").cluster().localNode().id(), ((TcpDiscoveryNode)G.ignite("client-0")
+            assertEquals(G.ignite("server-0").cluster().localNode().id(), ((TcpDiscoveryNode) G.ignite("client-0")
                 .cluster().localNode()).clientRouterNodeId());
 
             checkNodes(2, 1);
@@ -1193,7 +1195,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
         clientsPerSrv = CLIENTS;
 
         GridTestUtils.runMultiThreaded(new Callable<Void>() {
-            @Override public Void call() throws Exception {
+            @Override
+            public Void call() throws Exception {
                 Ignite g = startGrid("client-" + clientIdx.getAndIncrement());
 
                 clientNodeIds.add(g.cluster().localNode().id());
@@ -1206,6 +1209,129 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectAfterFail() throws Exception {
+        reconnectAfterFail(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectAfterFailTopologyChanged() throws Exception {
+        reconnectAfterFail(true);
+    }
+
+    /**
+     * @param changeTop If {@code true} topology is changed after client disconnects.
+     * @throws Exception If failed.
+     */
+    private void reconnectAfterFail(final boolean changeTop) throws Exception {
+        startServerNodes(1);
+
+        startClientNodes(1);
+
+        Ignite srv = G.ignite("server-0");
+
+        TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi());
+
+        Ignite client = G.ignite("client-0");
+
+        final ClusterNode clientNode = client.cluster().localNode();
+
+        final UUID clientId = clientNode.id();
+
+        final TestTcpDiscoverySpi clientSpi = ((TestTcpDiscoverySpi)client.configuration().getDiscoverySpi());
+
+        assertEquals(2L, clientNode.order());
+
+        final CountDownLatch failLatch = new CountDownLatch(1);
+
+        final CountDownLatch joinLatch = new CountDownLatch(1);
+
+        srv.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                info("Server event: " + evt);
+
+                DiscoveryEvent evt0 = (DiscoveryEvent)evt;
+
+                if (evt0.eventNode().id().equals(clientId) && (evt.type() == EVT_NODE_FAILED)) {
+                    if (evt.type() == EVT_NODE_FAILED)
+                        failLatch.countDown();
+                }
+                else if (evt.type() == EVT_NODE_JOINED) {
+                    TcpDiscoveryNode node = (TcpDiscoveryNode)evt0.eventNode();
+
+                    if ("client-0".equals(node.attribute(IgniteNodeAttributes.ATTR_GRID_NAME))) {
+                        assertEquals(changeTop ? 5L : 4L, node.order());
+
+                        joinLatch.countDown();
+                    }
+                }
+
+                return true;
+            }
+        }, EVT_NODE_FAILED, EVT_NODE_JOINED);
+
+        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+        final CountDownLatch disconnectLatch = new CountDownLatch(1);
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override
+            public boolean apply(Event evt) {
+                info("Client event: " + evt);
+
+                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+                    assertEquals(1, reconnectLatch.getCount());
+
+                    disconnectLatch.countDown();
+
+                    if (changeTop)
+                        clientSpi.pauseAll();
+                } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    assertEquals(0, disconnectLatch.getCount());
+
+                    reconnectLatch.countDown();
+                }
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+        srvSpi.failNode(client.cluster().localNode().id(), null);
+
+        if (changeTop) {
+            Ignite g = startGrid("server-" + srvIdx.getAndIncrement());
+
+            srvNodeIds.add(g.cluster().localNode().id());
+
+            clientSpi.resumeAll();
+        }
+
+        assertTrue(disconnectLatch.await(5000, MILLISECONDS));
+        assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+        assertTrue(failLatch.await(5000, MILLISECONDS));
+        assertTrue(joinLatch.await(5000, MILLISECONDS));
+
+        long topVer = changeTop ? 5L : 4L;
+
+        assertEquals(topVer, client.cluster().localNode().order());
+
+        assertEquals(topVer, client.cluster().topologyVersion());
+
+        Collection<ClusterNode> clientTop = client.cluster().topology(topVer);
+
+        assertEquals(changeTop ? 3 : 2, clientTop.size());
+
+        clientNodeIds.remove(clientId);
+
+        clientNodeIds.add(client.cluster().localNode().id());
+
+        checkNodes(changeTop ? 2 : 1, 1);
+    }
+
+    /**
      * @param clientIdx Client index.
      * @param srvIdx Server index.
      * @throws Exception In case of error.
@@ -1401,7 +1527,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
     private void checkRemoteNodes(Ignite ignite, int expCnt) {
         Collection<ClusterNode> nodes = ignite.cluster().forRemotes().nodes();
 
-        assertEquals(expCnt, nodes.size());
+        assertEquals("Unexpected state for node: " + ignite.name(), expCnt, nodes.size());
 
         for (ClusterNode node : nodes) {
             UUID id = node.id();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
new file mode 100644
index 0000000..7533a2c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.testsuites;
+
+import junit.framework.*;
+import org.apache.ignite.internal.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectTestSuite extends TestSuite {
+    /**
+     * @return Test suite.
+     * @throws Exception In case of error.
+     */
+    public static TestSuite suite() throws Exception {
+        TestSuite suite = new TestSuite("Ignite Client Reconnect Test Suite");
+
+        suite.addTestSuite(IgniteClientReconnectApiBlockTest.class);
+        suite.addTestSuite(IgniteClientReconnectDiscoveryStateTest.class);
+        suite.addTestSuite(IgniteClientReconnectCacheTest.class);
+
+        return suite;
+    }
+}


Mime
View raw message