ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vkuliche...@apache.org
Subject [01/11] ignite git commit: IGNITE-4473 - Client should re-try connection attempt in case of concurrent network failure.
Date Thu, 27 Apr 2017 11:52:52 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-security-fixes [created] ea991cd28


IGNITE-4473 - Client should re-try connection attempt in case of concurrent network failure.

(cherry picked from commit d124004)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a2b4751f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a2b4751f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a2b4751f

Branch: refs/heads/ignite-security-fixes
Commit: a2b4751f5eefd70a5a1aa26652c9671240125f78
Parents: 0ed4fda
Author: dkarachentsev <dkarachentsev@gridgain.com>
Authored: Fri Mar 17 14:57:48 2017 +0300
Committer: dkarachentsev <dkarachentsev@gridgain.com>
Committed: Fri Mar 17 15:16:21 2017 +0300

----------------------------------------------------------------------
 .../ignite/internal/GridKernalGatewayImpl.java  |   8 +-
 .../apache/ignite/internal/IgniteKernal.java    | 120 +++++-
 .../internal/IgniteNeedReconnectException.java  |  40 ++
 .../discovery/GridDiscoveryManager.java         |  24 ++
 .../GridCachePartitionExchangeManager.java      |  25 +-
 .../dht/GridDhtAssignmentFetchFuture.java       |  14 +-
 .../GridDhtPartitionsExchangeFuture.java        |  48 ++-
 .../service/GridServiceProcessor.java           |  86 ++---
 .../ignite/spi/discovery/tcp/ClientImpl.java    | 201 ++++++++--
 .../ignite/spi/discovery/tcp/ServerImpl.java    |   5 +
 .../spi/discovery/tcp/TcpDiscoveryImpl.java     |   8 +
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |   9 +
 .../IgniteClientReconnectCacheTest.java         |   7 +-
 .../ignite/internal/IgniteClientRejoinTest.java | 378 +++++++++++++++++++
 .../tcp/TcpClientDiscoverySpiSelfTest.java      |  48 ++-
 .../IgniteClientReconnectTestSuite.java         |   2 +
 16 files changed, 929 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a2b4751f/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
index fe8c580..036954a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
@@ -44,7 +44,7 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
 
     /** */
     @GridToStringExclude
-    private IgniteFutureImpl<?> reconnectFut;
+    private volatile IgniteFutureImpl<?> reconnectFut;
 
     /** */
     private final AtomicReference<GridKernalState> state = new AtomicReference<>(GridKernalState.STOPPED);
@@ -149,6 +149,12 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
 
     /** {@inheritDoc} */
     @Override public GridFutureAdapter<?> onDisconnected() {
+        if (state.get() == GridKernalState.DISCONNECTED) {
+            assert reconnectFut != null;
+
+            return (GridFutureAdapter<?>)reconnectFut.internalFuture();
+        }
+
         GridFutureAdapter<?> fut = new GridFutureAdapter<>();
 
         reconnectFut = new IgniteFutureImpl<>(fut);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a2b4751f/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 8fda72f..25f7884 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -250,6 +250,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
     /** Periodic starvation check interval. */
     private static final long PERIODIC_STARVATION_CHECK_FREQ = 1000 * 30;
 
+    /** Force complete reconnect future. */
+    private static final Object STOP_RECONNECT = new Object();
+
     /** */
     @GridToStringExclude
     private GridKernalContextImpl ctx;
@@ -327,6 +330,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
     @GridToStringExclude
     private final AtomicBoolean stopGuard = new AtomicBoolean();
 
+    /** */
+    private final ReconnectState reconnectState = new ReconnectState();
+
     /**
      * No-arg constructor is required by externalization.
      */
@@ -930,6 +936,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
             // Notify IO manager the second so further components can send and receive messages.
             ctx.io().onKernalStart();
 
+            boolean recon = false;
+
             // Callbacks.
             for (GridComponent comp : ctx) {
                 // Skip discovery manager.
@@ -940,10 +948,24 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
                 if (comp instanceof GridIoManager)
                     continue;
 
-                if (!skipDaemon(comp))
-                    comp.onKernalStart();
+                if (!skipDaemon(comp)) {
+                    try {
+                        comp.onKernalStart();
+                    }
+                    catch (IgniteNeedReconnectException e) {
+                        assert ctx.discovery().reconnectSupported();
+
+                        if (log.isDebugEnabled())
+                            log.debug("Failed to start node components on node start, will wait for reconnect: " + e);
+
+                        recon = true;
+                    }
+                }
             }
 
+            if (recon)
+                reconnectState.waitFirstReconnect();
+
             // Register MBeans.
             registerKernalMBean();
             registerLocalNodeMBean();
@@ -3274,6 +3296,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
     public void onDisconnected() {
         Throwable err = null;
 
+        reconnectState.waitPreviousReconnect();
+
         GridFutureAdapter<?> reconnectFut = ctx.gateway().onDisconnected();
 
         if (reconnectFut == null) {
@@ -3282,9 +3306,18 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
             return;
         }
 
-        IgniteFuture<?> userFut = new IgniteFutureImpl<>(reconnectFut);
+        IgniteFutureImpl<?> curFut = (IgniteFutureImpl<?>)ctx.cluster().get().clientReconnectFuture();
+
+        IgniteFuture<?> userFut;
 
-        ctx.cluster().get().clientReconnectFuture(userFut);
+        // In case of previous reconnect did not finish keep reconnect future.
+        if (curFut != null && curFut.internalFuture() == reconnectFut)
+            userFut = curFut;
+        else {
+            userFut = new IgniteFutureImpl<>(reconnectFut);
+
+            ctx.cluster().get().clientReconnectFuture(userFut);
+        }
 
         ctx.disconnected(true);
 
@@ -3337,30 +3370,53 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         try {
             ctx.disconnected(false);
 
-            GridCompoundFuture<?, ?> reconnectFut = new GridCompoundFuture<>();
+            GridCompoundFuture curReconnectFut = reconnectState.curReconnectFut = new GridCompoundFuture<>();
+
+            reconnectState.reconnectDone = new GridFutureAdapter<>();
 
             for (GridComponent comp : ctx.components()) {
                 IgniteInternalFuture<?> fut = comp.onReconnected(clusterRestarted);
 
                 if (fut != null)
-                    reconnectFut.add((IgniteInternalFuture)fut);
+                    curReconnectFut.add(fut);
             }
 
-            reconnectFut.add((IgniteInternalFuture)ctx.cache().context().exchange().reconnectExchangeFuture());
+            curReconnectFut.add(ctx.cache().context().exchange().reconnectExchangeFuture());
+
+            curReconnectFut.markInitialized();
 
-            reconnectFut.markInitialized();
+            final GridFutureAdapter reconnectDone = reconnectState.reconnectDone;
 
-            reconnectFut.listen(new CI1<IgniteInternalFuture<?>>() {
+            curReconnectFut.listen(new CI1<IgniteInternalFuture<?>>() {
                 @Override public void apply(IgniteInternalFuture<?> fut) {
                     try {
-                        fut.get();
+                        Object res = fut.get();
+
+                        if (res == STOP_RECONNECT)
+                            return;
 
                         ctx.gateway().onReconnected();
+
+                        reconnectState.firstReconnectFut.onDone();
                     }
                     catch (IgniteCheckedException e) {
-                        U.error(log, "Failed to reconnect, will stop node", e);
+                        if (!X.hasCause(e, IgniteNeedReconnectException.class,
+                            IgniteClientDisconnectedCheckedException.class)) {
+                            U.error(log, "Failed to reconnect, will stop node.", e);
+
+                            reconnectState.firstReconnectFut.onDone(e);
 
-                        close();
+                            close();
+                        }
+                        else {
+                            assert ctx.discovery().reconnectSupported();
+
+                            U.error(log, "Failed to finish reconnect, will retry [locNodeId=" + ctx.localNodeId() +
+                                ", err=" + e.getMessage() + ']');
+                        }
+                    }
+                    finally {
+                        reconnectDone.onDone();
                     }
                 }
             });
@@ -3519,6 +3575,46 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         }
     }
 
+    /**
+     *
+     */
+    private class ReconnectState {
+        /** */
+        private final GridFutureAdapter firstReconnectFut = new GridFutureAdapter();
+
+        /** */
+        private GridCompoundFuture<?, Object> curReconnectFut;
+
+        /** */
+        private GridFutureAdapter<?> reconnectDone;
+
+        /**
+         * @throws IgniteCheckedException If failed.
+         */
+        void waitFirstReconnect() throws IgniteCheckedException {
+            firstReconnectFut.get();
+        }
+
+        /**
+         *
+         */
+        void waitPreviousReconnect() {
+            if (curReconnectFut != null && !curReconnectFut.isDone()) {
+                assert reconnectDone != null;
+
+                curReconnectFut.onDone(STOP_RECONNECT);
+
+                try {
+                    reconnectDone.get();
+                }
+                catch (IgniteCheckedException ignote) {
+                    // No-op.
+                }
+            }
+
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(IgniteKernal.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a2b4751f/modules/core/src/main/java/org/apache/ignite/internal/IgniteNeedReconnectException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNeedReconnectException.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNeedReconnectException.java
new file mode 100644
index 0000000..61ab576
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNeedReconnectException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Indicates that node should try reconnect to cluster.
+ */
+public class IgniteNeedReconnectException extends IgniteCheckedException {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * @param locNode Local node.
+     * @param cause Cause.
+     */
+    public IgniteNeedReconnectException(ClusterNode locNode, @Nullable Throwable cause) {
+        super("Local node need try to reconnect [locNodeId=" + locNode.id() + ']', cause);
+
+        assert locNode.isClient();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a2b4751f/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 9aa4db1..2ec1070 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -112,6 +112,7 @@ import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport;
 import org.apache.ignite.spi.discovery.DiscoverySpiListener;
 import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
 import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
@@ -1891,6 +1892,29 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
+     * @return {@code True} if local node client and discovery SPI supports reconnect.
+     */
+    public boolean reconnectSupported() {
+        DiscoverySpi spi = getSpi();
+
+        return ctx.clientNode() && (spi instanceof TcpDiscoverySpi) &&
+            !(((TcpDiscoverySpi) spi).isClientReconnectDisabled());
+    }
+
+    /**
+     * Leave cluster and try to join again.
+     *
+     * @throws IgniteSpiException If failed.
+     */
+    public void reconnect() {
+        assert reconnectSupported();
+
+        DiscoverySpi discoverySpi = getSpi();
+
+        ((TcpDiscoverySpi)discoverySpi).reconnect();
+    }
+
+    /**
      * Updates topology version if current version is smaller than updated.
      *
      * @param updated Updated topology version.

http://git-wip-us.apache.org/repos/asf/ignite/blob/a2b4751f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 7f11dc4..92142c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteNeedReconnectException;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
@@ -447,6 +448,15 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     else
                         U.warn(log, "Still waiting for initial partition map exchange [fut=" + fut + ']');
                 }
+                catch (IgniteNeedReconnectException e) {
+                    throw e;
+                }
+                catch (Exception e) {
+                    if (fut.reconnectOnError(e))
+                        throw new IgniteNeedReconnectException(cctx.localNode(), e);
+
+                    throw e;
+                }
             }
 
             for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
@@ -1690,6 +1700,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                                         dumpedObjects++;
                                     }
                                 }
+                                catch (Exception e) {
+                                    if (exchFut.reconnectOnError(e))
+                                        throw new IgniteNeedReconnectException(cctx.localNode(), e);
+
+                                    throw e;
+                                }
                             }
 
 
@@ -1829,7 +1845,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 catch (IgniteInterruptedCheckedException e) {
                     throw e;
                 }
-                catch (IgniteClientDisconnectedCheckedException e) {
+                catch (IgniteClientDisconnectedCheckedException | IgniteNeedReconnectException e) {
+                    assert cctx.discovery().reconnectSupported();
+
+                    U.warn(log,"Local node failed to complete partition map exchange due to " +
+                        "network issues, will try to reconnect to cluster", e);
+
+                    cctx.discovery().reconnect();
+
                     return;
                 }
                 catch (IgniteCheckedException e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a2b4751f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
index ab8e863..6425bc1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
@@ -17,15 +17,16 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.Queue;
 import java.util.UUID;
-import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.IgniteNeedReconnectException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridNodeOrderComparator;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -34,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -202,8 +204,14 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
                         "continue to another node): " + node);
                 }
                 catch (IgniteCheckedException e) {
-                    U.error(log0, "Failed to request affinity assignment from remote node (will " +
-                        "continue to another node): " + node, e);
+                    if (ctx.discovery().reconnectSupported() && X.hasCause(e, IOException.class)) {
+                        onDone(new IgniteNeedReconnectException(ctx.localNode(), e));
+
+                        return;
+                    }
+
+                    U.warn(log0, "Failed to request affinity assignment from remote node (will " +
+                        "continue to another node): " + node);
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a2b4751f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index e945de9..d4f95e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -32,6 +33,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteNeedReconnectException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
@@ -39,6 +41,7 @@ import org.apache.ignite.events.CacheEvent;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -54,7 +57,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -65,7 +67,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -506,10 +508,17 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
             throw e;
         }
+        catch (IgniteNeedReconnectException e) {
+            onDone(e);
+        }
         catch (Throwable e) {
-            U.error(log, "Failed to reinitialize local partitions (preloading will be stopped): " + exchId, e);
+            if (reconnectOnError(e))
+                onDone(new IgniteNeedReconnectException(cctx.localNode(), e));
+            else {
+                U.error(log, "Failed to reinitialize local partitions (preloading will be stopped): " + exchId, e);
 
-            onDone(e);
+                onDone(e);
+            }
 
             if (e instanceof Error)
                 throw (Error)e;
@@ -1297,7 +1306,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             }
         }
         catch (IgniteCheckedException e) {
-            onDone(e);
+            if (reconnectOnError(e))
+                onDone(new IgniteNeedReconnectException(cctx.localNode(), e));
+            else
+                onDone(e);
         }
     }
 
@@ -1314,8 +1326,15 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         }
         catch (IgniteCheckedException e) {
             if (e instanceof ClusterTopologyCheckedException || !cctx.discovery().alive(n)) {
-                log.debug("Failed to send full partition map to node, node left grid " +
-                    "[rmtNode=" + nodeId + ", exchangeId=" + exchId + ']');
+                if (log.isDebugEnabled())
+                    log.debug("Failed to send full partition map to node, node left grid " +
+                        "[rmtNode=" + nodeId + ", exchangeId=" + exchId + ']');
+
+                return;
+            }
+
+            if (reconnectOnError(e)) {
+                onDone(new IgniteNeedReconnectException(cctx.localNode(), e));
 
                 return;
             }
@@ -1641,6 +1660,12 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                             }
                         }
                     }
+                    catch (Exception e) {
+                        if (reconnectOnError(e))
+                            onDone(new IgniteNeedReconnectException(cctx.localNode(), e));
+                        else
+                            throw e;
+                    }
                     finally {
                         leaveBusy();
                     }
@@ -1652,6 +1677,15 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         }
     }
 
+    /**
+     * @param e Exception.
+     * @return {@code True} if local node should try reconnect in case of error.
+     */
+    public boolean reconnectOnError(Throwable e) {
+        return X.hasCause(e, IOException.class, IgniteClientDisconnectedCheckedException.class) &&
+            cctx.discovery().reconnectSupported();
+    }
+
     /** {@inheritDoc} */
     @Override public int compareTo(GridDhtPartitionsExchangeFuture fut) {
         return exchId.compareTo(fut.exchId);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a2b4751f/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 6bcfd65..bd81518 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -1498,60 +1498,60 @@ public class GridServiceProcessor extends GridProcessorAdapter {
         }
     }
 
-        /**
-         * Deployment callback.
-         *
-         * @param dep Service deployment.
-         * @param topVer Topology version.
-         */
-        private void onDeployment(final GridServiceDeployment dep, final AffinityTopologyVersion topVer) {
-            // Retry forever.
-            try {
-                AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx();
+    /**
+     * Deployment callback.
+     *
+     * @param dep Service deployment.
+     * @param topVer Topology version.
+     */
+    private void onDeployment(final GridServiceDeployment dep, final AffinityTopologyVersion topVer) {
+        // Retry forever.
+        try {
+            AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx();
 
-                // If topology version changed, reassignment will happen from topology event.
-                if (newTopVer.equals(topVer))
-                    reassign(dep, topVer);
-            }
-            catch (IgniteCheckedException e) {
-                if (!(e instanceof ClusterTopologyCheckedException))
-                    log.error("Failed to do service reassignment (will retry): " + dep.configuration().getName(), e);
+            // If topology version changed, reassignment will happen from topology event.
+            if (newTopVer.equals(topVer))
+                reassign(dep, topVer);
+        }
+        catch (IgniteCheckedException e) {
+            if (!(e instanceof ClusterTopologyCheckedException))
+                log.error("Failed to do service reassignment (will retry): " + dep.configuration().getName(), e);
 
-                AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx();
+            AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx();
 
-                if (!newTopVer.equals(topVer)) {
-                    assert newTopVer.compareTo(topVer) > 0;
+            if (!newTopVer.equals(topVer)) {
+                assert newTopVer.compareTo(topVer) > 0;
 
-                    // Reassignment will happen from topology event.
-                    return;
-                }
+                // Reassignment will happen from topology event.
+                return;
+            }
 
-                ctx.timeout().addTimeoutObject(new GridTimeoutObject() {
-                    private IgniteUuid id = IgniteUuid.randomUuid();
+            ctx.timeout().addTimeoutObject(new GridTimeoutObject() {
+                private IgniteUuid id = IgniteUuid.randomUuid();
 
-                    private long start = System.currentTimeMillis();
+                private long start = System.currentTimeMillis();
 
-                    @Override public IgniteUuid timeoutId() {
-                        return id;
-                    }
+                @Override public IgniteUuid timeoutId() {
+                    return id;
+                }
 
-                    @Override public long endTime() {
-                        return start + RETRY_TIMEOUT;
-                    }
+                @Override public long endTime() {
+                    return start + RETRY_TIMEOUT;
+                }
 
-                    @Override public void onTimeout() {
-                        if (!busyLock.enterBusy())
-                            return;
+                @Override public void onTimeout() {
+                    if (!busyLock.enterBusy())
+                        return;
 
-                        try {
-                            // Try again.
-                            onDeployment(dep, topVer);
-                        }
-                        finally {
-                            busyLock.leaveBusy();
-                        }
+                    try {
+                        // Try again.
+                        onDeployment(dep, topVer);
                     }
-                });
+                    finally {
+                        busyLock.leaveBusy();
+                    }
+                }
+            });
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a2b4751f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 95e2cda..02ba56a 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -129,6 +129,9 @@ class ClientImpl extends TcpDiscoveryImpl {
     /** */
     private static final Object SPI_RECONNECT_FAILED = "SPI_RECONNECT_FAILED";
 
+    /** */
+    private static final Object SPI_RECONNECT = "SPI_RECONNECT";
+
     /** Remote nodes. */
     private final ConcurrentMap<UUID, TcpDiscoveryNode> rmtNodes = new ConcurrentHashMap8<>();
 
@@ -809,6 +812,11 @@ class ClientImpl extends TcpDiscoveryImpl {
     }
 
     /** {@inheritDoc} */
+    @Override public void reconnect() throws IgniteSpiException {
+        msgWorker.addMessage(SPI_RECONNECT);
+    }
+
+    /** {@inheritDoc} */
     @Override public void brakeConnection() {
         SocketStream sockStream = msgWorker.currSock;
 
@@ -879,9 +887,12 @@ class ClientImpl extends TcpDiscoveryImpl {
         /** */
         private UUID rmtNodeId;
 
+        /** */
+        private CountDownLatch stopReadLatch;
+
         /**
          */
-        protected SocketReader() {
+        SocketReader() {
             super(spi.ignite().name(), "tcp-client-disco-sock-reader", log);
         }
 
@@ -889,7 +900,7 @@ class ClientImpl extends TcpDiscoveryImpl {
          * @param sockStream Socket.
          * @param rmtNodeId Rmt node id.
          */
-        public void setSocket(SocketStream sockStream, UUID rmtNodeId) {
+        void setSocket(SocketStream sockStream, UUID rmtNodeId) {
             synchronized (mux) {
                 this.sockStream = sockStream;
 
@@ -899,6 +910,31 @@ class ClientImpl extends TcpDiscoveryImpl {
             }
         }
 
+        /**
+         * @throws InterruptedException If interrupted.
+         */
+        private void forceStopRead() throws InterruptedException {
+            CountDownLatch stopReadLatch;
+
+            synchronized (mux) {
+                SocketStream stream = sockStream;
+
+                if (stream == null)
+                    return;
+
+                this.stopReadLatch = stopReadLatch = new CountDownLatch(1);
+
+                U.closeQuiet(stream.socket());
+
+                this.sockStream = null;
+                this.rmtNodeId = null;
+
+                mux.notifyAll();
+            }
+
+            stopReadLatch.await();
+        }
+
         /** {@inheritDoc} */
         @Override protected void body() throws InterruptedException {
             while (!isInterrupted()) {
@@ -906,6 +942,12 @@ class ClientImpl extends TcpDiscoveryImpl {
                 UUID rmtNodeId;
 
                 synchronized (mux) {
+                    if (stopReadLatch != null) {
+                        stopReadLatch.countDown();
+
+                        stopReadLatch = null;
+                    }
+
                     if (this.sockStream == null) {
                         mux.wait();
 
@@ -1007,18 +1049,21 @@ class ClientImpl extends TcpDiscoveryImpl {
         private final Queue<TcpDiscoveryAbstractMessage> queue = new ArrayDeque<>();
 
         /** */
-        private final long socketTimeout;
+        private final long sockTimeout;
 
         /** */
         private TcpDiscoveryAbstractMessage unackedMsg;
 
+        /** */
+        private CountDownLatch forceLeaveLatch;
+
         /**
          *
          */
-        protected SocketWriter() {
+        SocketWriter() {
             super(spi.ignite().name(), "tcp-client-disco-sock-writer", log);
 
-            socketTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
+            sockTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
                 spi.getSocketTimeout();
         }
 
@@ -1034,6 +1079,29 @@ class ClientImpl extends TcpDiscoveryImpl {
         }
 
         /**
+         * Sends {@link TcpDiscoveryNodeLeftMessage} and closes socket.
+         *
+         * @throws InterruptedException If interrupted.
+         */
+        private void forceLeave() throws InterruptedException {
+            CountDownLatch forceLeaveLatch;
+
+            synchronized (mux) {
+                // If writer was stopped.
+                if (sock == null)
+                    return;
+
+                this.forceLeaveLatch = forceLeaveLatch = new CountDownLatch(1);
+
+                unackedMsg = null;
+
+                mux.notifyAll();
+            }
+
+            forceLeaveLatch.await();
+        }
+
+        /**
          * @param sock Socket.
          * @param clientAck {@code True} is server supports client message acknowlede.
          */
@@ -1089,13 +1157,41 @@ class ClientImpl extends TcpDiscoveryImpl {
                         continue;
                     }
 
-                    msg = queue.poll();
+                    if (forceLeaveLatch != null) {
+                        msg = new TcpDiscoveryNodeLeftMessage(getLocalNodeId());
 
-                    if (msg == null) {
-                        mux.wait();
+                        msg.client(true);
+
+                        try {
+                            spi.writeToSocket(
+                                sock,
+                                msg,
+                                sockTimeout);
+                        }
+                        catch (IOException | IgniteCheckedException e) {
+                            if (log.isDebugEnabled()) {
+                                log.debug("Failed to send TcpDiscoveryNodeLeftMessage on force leave [msg=" + msg +
+                                    ", err=" + e.getMessage() + ']');
+                            }
+                        }
+
+                        U.closeQuiet(sock);
+
+                        this.sock = null;
+
+                        clear();
 
                         continue;
                     }
+                    else {
+                        msg = queue.poll();
+
+                        if (msg == null) {
+                            mux.wait();
+
+                            continue;
+                        }
+                    }
                 }
 
                 for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : spi.sndMsgLsnrs)
@@ -1115,7 +1211,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                     spi.writeToSocket(
                         sock,
                         msg,
-                        socketTimeout);
+                        sockTimeout);
 
                     msg = null;
 
@@ -1165,10 +1261,30 @@ class ClientImpl extends TcpDiscoveryImpl {
                     synchronized (mux) {
                         if (sock == this.sock)
                             this.sock = null; // Connection has dead.
+
+                        clear();
                     }
                 }
             }
         }
+
+        /**
+         *
+         */
+        private void clear() {
+            assert Thread.holdsLock(mux);
+
+            queue.clear();
+            unackedMsg = null;
+
+            CountDownLatch forceLeaveLatch = this.forceLeaveLatch;
+
+            if (forceLeaveLatch != null) {
+                this.forceLeaveLatch = null;
+
+                forceLeaveLatch.countDown();
+            }
+        }
     }
 
     /**
@@ -1413,6 +1529,38 @@ class ClientImpl extends TcpDiscoveryImpl {
                         else
                             leaveLatch.countDown();
                     }
+                    else if (msg == SPI_RECONNECT) {
+                        if (state == CONNECTED) {
+                            if (reconnector != null) {
+                                reconnector.cancel();
+                                reconnector.join();
+
+                                reconnector = null;
+                            }
+
+                            sockWriter.forceLeave();
+                            sockReader.forceStopRead();
+
+                            currSock = null;
+
+                            queue.clear();
+
+                            onDisconnected();
+
+                            notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes());
+
+                            UUID newId = UUID.randomUUID();
+
+                            U.quietAndWarn(log, "Local node will try to reconnect to cluster with new id due " +
+                                "to network problems [newId=" + newId +
+                                ", prevId=" + locNode.id() +
+                                ", locNode=" + locNode+ ']');
+
+                            locNode.onClientDisconnected(newId);
+
+                            tryJoin();
+                        }
+                    }
                     else if (msg instanceof TcpDiscoveryNodeFailedMessage &&
                         ((TcpDiscoveryNodeFailedMessage)msg).failedNodeId().equals(locNode.id())) {
                         TcpDiscoveryNodeFailedMessage msg0 = (TcpDiscoveryNodeFailedMessage)msg;
@@ -1495,20 +1643,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                                         ", failMsg=" + forceFailMsg + ']');
                                 }
 
-                                state = DISCONNECTED;
-
-                                nodeAdded = false;
-
-                                IgniteClientDisconnectedCheckedException err =
-                                    new IgniteClientDisconnectedCheckedException(null, "Failed to ping node, " +
-                                    "client node disconnected.");
-
-                                for (Map.Entry<UUID, GridFutureAdapter<Boolean>> e : pingFuts.entrySet()) {
-                                    GridFutureAdapter<Boolean> fut = e.getValue();
-
-                                    if (pingFuts.remove(e.getKey(), fut))
-                                        fut.onDone(err);
-                                }
+                                onDisconnected();
 
                                 notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes());
                             }
@@ -1604,6 +1739,26 @@ class ClientImpl extends TcpDiscoveryImpl {
         }
 
         /**
+         *
+         */
+        private void onDisconnected() {
+            state = DISCONNECTED;
+
+            nodeAdded = false;
+
+            IgniteClientDisconnectedCheckedException err =
+                new IgniteClientDisconnectedCheckedException(null, "Failed to ping node, " +
+                    "client node disconnected.");
+
+            for (Map.Entry<UUID, GridFutureAdapter<Boolean>> e : pingFuts.entrySet()) {
+                GridFutureAdapter<Boolean> fut = e.getValue();
+
+                if (pingFuts.remove(e.getKey(), fut))
+                    fut.onDone(err);
+            }
+        }
+
+        /**
          * @throws InterruptedException If interrupted.
          */
         private void tryJoin() throws InterruptedException {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a2b4751f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 4600be0..afd1c2b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1610,6 +1610,11 @@ class ServerImpl extends TcpDiscoveryImpl {
     }
 
     /** {@inheritDoc} */
+    @Override public void reconnect() throws IgniteSpiException {
+        throw new UnsupportedOperationException("Reconnect is not supported for server.");
+    }
+
+    /** {@inheritDoc} */
     @Override protected IgniteSpiThread workerThread() {
         return msgWorker;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a2b4751f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index f199c20..84c2ff2 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentLinkedDeque;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -259,6 +260,13 @@ abstract class TcpDiscoveryImpl {
     }
 
     /**
+     * Leave cluster and try to join again.
+     *
+     * @throws IgniteSpiException If failed.
+     */
+    public abstract void reconnect() throws IgniteSpiException;
+
+    /**
      * <strong>FOR TEST ONLY!!!</strong>
      * <p>
      * Simulates this node failure by stopping service threads. So, node will become

http://git-wip-us.apache.org/repos/asf/ignite/blob/a2b4751f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 00ae97d..a2a47fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1927,6 +1927,15 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     }
 
     /**
+     * Force reconnect to cluster.
+     *
+     * @throws IgniteSpiException If failed.
+     */
+    public void reconnect() throws IgniteSpiException {
+        impl.reconnect();
+    }
+
+    /**
      * <strong>FOR TEST ONLY!!!</strong>
      */
     public int clientWorkerCount() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a2b4751f/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
index 0f0165b..6cdf465 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@ -700,9 +700,12 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
                 try {
                     Ignition.start(optimize(getConfiguration(getTestGridName(SRV_CNT))));
 
-                    fail();
+                    // Commented due to IGNITE-4473, because
+                    // IgniteClientDisconnectedException won't
+                    // be thrown, but client will reconnect.
+//                    fail();
 
-                    return false;
+                    return true;
                 }
                 catch (IgniteClientDisconnectedException e) {
                     log.info("Expected start error: " + e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a2b4751f/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
new file mode 100644
index 0000000..a5d42e9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteClientDisconnectedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
+import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests client to be able restore connection to cluster if coordination is not available.
+ */
+public class IgniteClientRejoinTest extends GridCommonAbstractTest {
+    /** Block. */
+    private volatile boolean block;
+
+    /** Block all. */
+    private volatile boolean blockAll;
+
+    /** Coordinator. */
+    private volatile ClusterNode crd;
+
+    /** Client reconnect disabled. */
+    private boolean clientReconnectDisabled;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        System.setProperty("IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK", "true");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        System.clearProperty("IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        clientReconnectDisabled = false;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        if (gridName.contains("client")) {
+            cfg.setCommunicationSpi(new TcpCommunicationSpi());
+
+            TcpDiscoverySpi spi = (TcpDiscoverySpi)cfg.getDiscoverySpi();
+            DiscoverySpi dspi = new DiscoverySpi();
+
+            dspi.setIpFinder(spi.getIpFinder());
+
+            cfg.setDiscoverySpi(dspi);
+
+            dspi.setJoinTimeout(60_000);
+            dspi.setClientReconnectDisabled(clientReconnectDisabled);
+
+            cfg.setClientMode(true);
+        }
+
+        // TODO: IGNITE-4833
+        cfg.setPeerClassLoadingEnabled(false);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientsReconnectAfterStart() throws Exception {
+        Ignite srv1 = startGrid("server1");
+
+        crd = ((IgniteKernal)srv1).localNode();
+
+        Ignite srv2 = startGrid("server2");
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        List<Ignite> clientNodes = new ArrayList<>();
+
+        final int CLIENTS_NUM = 5;
+
+        for (int i = 0; i < CLIENTS_NUM; i++)
+            clientNodes.add(startGrid("client" + i));
+
+        blockAll = true;
+
+        GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                U.sleep(5_000);
+
+                block = true;
+                blockAll = false;
+
+                System.out.println(">>> Allow with blocked coordinator.");
+
+                latch.countDown();
+
+                return null;
+            }
+        });
+
+        IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                latch.await();
+
+                U.sleep((new Random().nextInt(15) + 30) * 1000);
+
+                block = false;
+
+                System.out.println(">>> Allow coordinator.");
+
+                return null;
+            }
+        });
+
+        fut.get();
+
+        for (Ignite client : clientNodes) {
+            while (true) {
+                try {
+                    IgniteCache<Integer, Integer> cache = client.getOrCreateCache("some");
+
+                    for (int i = 0; i < 100; i++)
+                        cache.put(i, i);
+
+                    for (int i = 0; i < 100; i++)
+                        assertEquals((Integer)i, cache.get(i));
+
+                    cache.clear();
+
+                    break;
+                }
+                catch (IgniteClientDisconnectedException e) {
+                    e.reconnectFuture().get();
+                }
+            }
+        }
+
+        assertEquals(CLIENTS_NUM, srv1.cluster().forClients().nodes().size());
+        assertEquals(CLIENTS_NUM, srv2.cluster().forClients().nodes().size());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientsReconnect() throws Exception {
+        Ignite srv1 = startGrid("server1");
+
+        crd = ((IgniteKernal)srv1).localNode();
+
+        Ignite srv2 = startGrid("server2");
+
+        block = true;
+
+        List<IgniteInternalFuture<Ignite>> futs = new ArrayList<>();
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        final int CLIENTS_NUM = 5;
+
+        for (int i = 0; i < CLIENTS_NUM; i++) {
+            final int idx = i;
+
+            IgniteInternalFuture<Ignite> fut = GridTestUtils.runAsync(new Callable<Ignite>() {
+                @Override public Ignite call() throws Exception {
+                    latch.await();
+
+                    return startGrid("client" + idx);
+                }
+            });
+
+            futs.add(fut);
+        }
+
+        GridTestUtils.runAsync(new Callable<Boolean>() {
+            @Override public Boolean call() throws Exception {
+                latch.countDown();
+
+                Random rnd = new Random();
+
+                U.sleep((rnd.nextInt(15) + 15) * 1000);
+
+                block = false;
+
+                System.out.println(">>> ALLOW connection to coordinator.");
+
+                return true;
+            }
+        });
+
+        for (IgniteInternalFuture<Ignite> clientFut : futs) {
+            Ignite client = clientFut.get();
+
+            IgniteCache<Integer, Integer> cache = client.getOrCreateCache(client.name());
+
+            for (int i = 0; i < 100; i++)
+                cache.put(i, i);
+
+            for (int i = 0; i < 100; i++)
+                assert i == cache.get(i);
+        }
+
+        assertEquals(CLIENTS_NUM, srv1.cluster().forClients().nodes().size());
+        assertEquals(CLIENTS_NUM, srv2.cluster().forClients().nodes().size());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientsReconnectDisabled() throws Exception {
+        clientReconnectDisabled = true;
+
+        Ignite srv1 = startGrid("server1");
+
+        crd = ((IgniteKernal)srv1).localNode();
+
+        Ignite srv2 = startGrid("server2");
+
+        block = true;
+
+        List<IgniteInternalFuture<Ignite>> futs = new ArrayList<>();
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        final int CLIENTS_NUM = 5;
+
+        for (int i = 0; i < CLIENTS_NUM; i++) {
+            final int idx = i;
+
+            IgniteInternalFuture<Ignite> fut = GridTestUtils.runAsync(new Callable<Ignite>() {
+                @Override public Ignite call() throws Exception {
+                    latch.await();
+
+                    return startGrid("client" + idx);
+                }
+            });
+
+            futs.add(fut);
+        }
+
+        latch.countDown();
+
+        for (final IgniteInternalFuture<Ignite> clientFut : futs) {
+            //noinspection ThrowableNotThrown
+            GridTestUtils.assertThrows(log, new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    clientFut.get();
+
+                    return null;
+                }
+            }, IgniteCheckedException.class, null);
+        }
+
+        assertEquals(0, srv1.cluster().forClients().nodes().size());
+        assertEquals(0, srv2.cluster().forClients().nodes().size());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 3 * 60_000;
+    }
+
+    /**
+     *
+     */
+    private class TcpCommunicationSpi extends org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi {
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
+            if (blockAll || block && node.id().equals(crd.id()))
+                throw new IgniteSpiException(new SocketException("Test communication exception"));
+
+            super.sendMessage(node, msg);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg,
+            IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
+            if (blockAll || block && node.id().equals(crd.id()))
+                throw new IgniteSpiException(new SocketException("Test communication exception"));
+
+            super.sendMessage(node, msg, ackC);
+        }
+    }
+
+    /**
+     *
+     */
+    private class DiscoverySpi extends TcpDiscoverySpi {
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, byte[] data,
+            long timeout) throws IOException {
+            if (blockAll || block && sock.getPort() == 47500)
+                throw new SocketException("Test discovery exception");
+
+            super.writeToSocket(sock, msg, data, timeout);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
+            long timeout) throws IOException, IgniteCheckedException {
+            if (blockAll || block && sock.getPort() == 47500)
+                throw new SocketException("Test discovery exception");
+
+            super.writeToSocket(sock, msg, timeout);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg,
+            long timeout) throws IOException, IgniteCheckedException {
+            if (blockAll || block && sock.getPort() == 47500)
+                throw new SocketException("Test discovery exception");
+
+            super.writeToSocket(sock, out, msg, timeout);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res,
+            long timeout) throws IOException {
+            if (blockAll || block && sock.getPort() == 47500)
+                throw new SocketException("Test discovery exception");
+
+            super.writeToSocket(msg, sock, res, timeout);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected Socket openSocket(Socket sock, InetSocketAddress remAddr,
+            IgniteSpiOperationTimeoutHelper timeoutHelper) throws IOException, IgniteSpiOperationTimeoutException {
+            if (blockAll || block && sock.getPort() == 47500)
+                throw new SocketException("Test discovery exception");
+
+            return super.openSocket(sock, remAddr, timeoutHelper);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a2b4751f/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index 331b581..0483a1c 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteMessaging;
 import org.apache.ignite.IgniteState;
@@ -43,6 +44,7 @@ import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
@@ -1788,8 +1790,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
         clientNodeIds.add(client.cluster().localNode().id());
 
         GridTestUtils.waitForCondition(new GridAbsPredicate() {
-            @Override
-            public boolean apply() {
+            @Override public boolean apply() {
                 return srv.cluster().nodes().size() == 2;
             }
         }, awaitTime());
@@ -1800,6 +1801,49 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testForceClientReconnect() throws Exception {
+        startServerNodes(1);
+
+        startClientNodes(1);
+
+        Ignite srv = G.ignite("server-0");
+        IgniteKernal client = (IgniteKernal)G.ignite("client-0");
+
+        UUID clientId = F.first(clientNodeIds);
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        srv.events().enableLocal(EVT_NODE_JOINED);
+
+        srv.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                latch.countDown();
+
+                return false;
+            }
+        }, EVT_NODE_JOINED);
+
+        client.context().discovery().reconnect();
+
+        assert latch.await(10, TimeUnit.SECONDS);
+
+        while (true) {
+            try {
+                UUID newId = client.localNode().id();
+
+                assert !clientId.equals(newId) : clientId;
+
+                break;
+            }
+            catch (IgniteClientDisconnectedException e) {
+                e.reconnectFuture().get(10_000);
+            }
+        }
+    }
+
+    /**
      * @param ignite Ignite.
      * @throws Exception If failed.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/a2b4751f/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
index ea8e37b..67d88e1 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.IgniteClientReconnectFailoverTest;
 import org.apache.ignite.internal.IgniteClientReconnectServicesTest;
 import org.apache.ignite.internal.IgniteClientReconnectStopTest;
 import org.apache.ignite.internal.IgniteClientReconnectStreamerTest;
+import org.apache.ignite.internal.IgniteClientRejoinTest;
 
 /**
  *
@@ -52,6 +53,7 @@ public class IgniteClientReconnectTestSuite extends TestSuite {
         suite.addTestSuite(IgniteClientReconnectServicesTest.class);
         suite.addTestSuite(IgniteClientReconnectStreamerTest.class);
         suite.addTestSuite(IgniteClientReconnectFailoverTest.class);
+        suite.addTestSuite(IgniteClientRejoinTest.class);
 
         return suite;
     }


Mime
View raw message