ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject [2/4] ignite git commit: IGNITE-4473 - Client should re-try connection attempt in case of concurrent network failure.
Date Tue, 07 Mar 2017 16:15:57 GMT
IGNITE-4473 - Client should re-try connection attempt in case of concurrent network failure.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8e5a5f9d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8e5a5f9d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8e5a5f9d

Branch: refs/heads/ignite-4473-1
Commit: 8e5a5f9d5cc4e4cd3d7d1e278d9a6a85dacfd706
Parents: 9df5e94
Author: dkarachentsev <dkarachentsev@gridgain.com>
Authored: Fri Mar 3 17:28:27 2017 +0300
Committer: dkarachentsev <dkarachentsev@gridgain.com>
Committed: Fri Mar 3 17:28:27 2017 +0300

----------------------------------------------------------------------
 .../IgniteCouldReconnectCheckedException.java   |  33 +++
 .../ignite/internal/GridKernalGateway.java      |   7 +
 .../ignite/internal/GridKernalGatewayImpl.java  |  14 +
 .../apache/ignite/internal/IgniteKernal.java    |  45 ++-
 .../discovery/GridDiscoveryManager.java         |  11 +
 .../GridCachePartitionExchangeManager.java      |  23 +-
 .../dht/GridDhtAssignmentFetchFuture.java       |  13 +-
 .../GridDhtPartitionsExchangeFuture.java        |  14 +-
 .../service/GridServiceProcessor.java           |  86 +++---
 .../ignite/spi/discovery/DiscoverySpi.java      |  10 +
 .../ignite/spi/discovery/tcp/ClientImpl.java    | 157 ++++++++++-
 .../ignite/spi/discovery/tcp/ServerImpl.java    |   6 +
 .../spi/discovery/tcp/TcpDiscoveryImpl.java     |  10 +
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |   6 +
 .../ignite/internal/IgniteClientRejoinTest.java | 281 +++++++++++++++++++
 .../cluster/GridUpdateNotifierSelfTest.java     |   2 +
 .../IgniteClientReconnectTestSuite.java         |   2 +
 17 files changed, 657 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5a5f9d/modules/core/src/main/java/org/apache/ignite/IgniteCouldReconnectCheckedException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCouldReconnectCheckedException.java b/modules/core/src/main/java/org/apache/ignite/IgniteCouldReconnectCheckedException.java
new file mode 100644
index 0000000..62acaee
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCouldReconnectCheckedException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite;
+
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Indicates whether node could be rejoined to cluster.
+ */
+public class IgniteCouldReconnectCheckedException extends IgniteCheckedException {
+    /**
+     * @param msg Message.
+     * @param cause Cause.
+     */
+    public IgniteCouldReconnectCheckedException(String msg, @Nullable Throwable cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5a5f9d/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java
index 1b9da2f..da52d09 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java
@@ -123,4 +123,11 @@ public interface GridKernalGateway {
      * Reconnected callback.
      */
     public void onReconnected();
+
+    /**
+     * Reconnect failed callback.
+     *
+     * @param t Cause.
+     */
+    public void onReconnectFailed(Throwable t);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5a5f9d/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
index fe8c580..84bb886 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
@@ -23,6 +23,7 @@ import java.io.StringWriter;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteCouldReconnectCheckedException;
 import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.internal.util.GridSpinReadWriteLock;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -151,9 +152,16 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
     @Override public GridFutureAdapter<?> onDisconnected() {
         GridFutureAdapter<?> fut = new GridFutureAdapter<>();
 
+        IgniteFutureImpl reconnectFut0 = reconnectFut;
+
         reconnectFut = new IgniteFutureImpl<>(fut);
 
         if (!state.compareAndSet(GridKernalState.STARTED, GridKernalState.DISCONNECTED)) {
+            Throwable error = reconnectFut0.internalFuture().error();
+
+            if (error instanceof IgniteCouldReconnectCheckedException)
+                return fut;
+
             ((GridFutureAdapter<?>)reconnectFut.internalFuture()).onDone(new IgniteCheckedException("Node stopped."));
 
             return null;
@@ -168,6 +176,12 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
             ((GridFutureAdapter<?>)reconnectFut.internalFuture()).onDone();
     }
 
+    /** {@inheritDoc} */
+    @Override public void onReconnectFailed(Throwable t) {
+        if (state.get() == GridKernalState.DISCONNECTED)
+            ((GridFutureAdapter<?>)reconnectFut.internalFuture()).onDone(t);
+    }
+
     /**
      * Retrieves user stack trace.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5a5f9d/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 4972d1f..1fb5b57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -54,6 +54,7 @@ import org.apache.ignite.IgniteAtomicStamped;
 import org.apache.ignite.IgniteBinary;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteCouldReconnectCheckedException;
 import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.IgniteCountDownLatch;
@@ -780,6 +781,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
 
         List<PluginProvider> plugins = U.allPluginProviders();
 
+        boolean recon = false;
+
         // Spin out SPIs & managers.
         try {
             ctx = new GridKernalContextImpl(log,
@@ -946,8 +949,35 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
                 if (comp instanceof GridIoManager)
                     continue;
 
-                if (!skipDaemon(comp))
-                    comp.onKernalStart();
+                if (!skipDaemon(comp)) {
+                    try {
+                        comp.onKernalStart();
+                    }
+                    catch (IgniteCouldReconnectCheckedException e) {
+                        recon = true;
+                    }
+                }
+            }
+
+            while (recon) {
+                try {
+                    ctx.discovery().rejoin().get(); // TODO timeout?
+
+                    IgniteFuture<?> reconFut = ctx.cluster().clientReconnectFuture();
+
+                    reconFut.get();
+
+                    recon = false;
+                }
+                catch (IgniteException e) {
+                    if (X.hasCause(e, IgniteCouldReconnectCheckedException.class, IgniteClientDisconnectedException.class)) {
+                        log.warning("Rejoin failed, retry. locNodeId=" + ctx.localNodeId() + ']');
+
+                        continue;
+                    }
+
+                    throw e;
+                }
             }
 
             // Register MBeans.
@@ -3393,9 +3423,16 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
                         ctx.gateway().onReconnected();
                     }
                     catch (IgniteCheckedException e) {
-                        U.error(log, "Failed to reconnect, will stop node", e);
+                        if (!X.hasCause(e, IgniteCouldReconnectCheckedException.class, IgniteClientDisconnectedCheckedException.class)) {
+                            U.error(log, "Failed to reconnect, will stop node 2", e);
 
-                        close();
+                            close();
+                        }
+                        else {
+                            U.error(log, "Failed to reconnect, retry", e);
+
+                            ctx.gateway().onReconnectFailed(e);
+                        }
                     }
                 }
             });

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5a5f9d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 9aa4db1..9efc428 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -1891,6 +1891,17 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
+     * Leave cluster and try to join again.
+     *
+     * @return Future will be completed with success when node joined cluster
+     * or throw exception if failed.
+     * @throws IgniteSpiException If failed.
+     */
+    public IgniteInternalFuture<Object> rejoin() {
+        return getSpi().rejoin();
+    }
+
+    /**
      * Updates topology version if current version is smaller than updated.
      *
      * @param updated Updated topology version.

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5a5f9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 7cf75fe..0dd1a58 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.io.IOException;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
@@ -43,6 +44,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteCouldReconnectCheckedException;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
@@ -87,6 +89,7 @@ import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -448,6 +451,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     else
                         U.warn(log, "Still waiting for initial partition map exchange [fut=" + fut + ']');
                 }
+                catch (Exception e) {
+                    if (cctx.localNode().isClient() && X.hasCause(e, IOException.class))
+                        throw new IgniteCouldReconnectCheckedException("Reconnect", e);
+
+                    throw e;
+                }
             }
 
             for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
@@ -1697,6 +1706,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                                         dumpedObjects++;
                                     }
                                 }
+                                catch (Exception e) {
+                                    if (cctx.localNode().isClient() && X.hasCause(e, IOException.class))
+                                        throw new IgniteCouldReconnectCheckedException("Reconnect", e);
+
+                                    throw e;
+                                }
                             }
 
 
@@ -1836,8 +1851,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 catch (IgniteInterruptedCheckedException e) {
                     throw e;
                 }
-                catch (IgniteClientDisconnectedCheckedException e) {
-                    return;
+                catch (IgniteClientDisconnectedCheckedException | IgniteCouldReconnectCheckedException e) {
+                    if (!cctx.localNode().isClient()) {
+                        U.error(log, "Ignore exception", e);
+
+                        return;
+                    }
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to wait for completion of partition map exchange " +

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5a5f9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
index ab8e863..bc0adda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
@@ -17,12 +17,12 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.Queue;
 import java.util.UUID;
-import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -202,8 +203,14 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
                         "continue to another node): " + node);
                 }
                 catch (IgniteCheckedException e) {
-                    U.error(log0, "Failed to request affinity assignment from remote node (will " +
-                        "continue to another node): " + node, e);
+                    if (log0.isDebugEnabled() || !X.hasCause(e, IOException.class)) {
+                        U.error(log0, "Failed to request affinity assignment from remote node (will " +
+                            "continue to another node): " + node, e);
+                    }
+                    else {
+                        U.warn(log0, "Failed to request affinity assignment from remote node (will " +
+                            "continue to another node): " + node);
+                    }
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5a5f9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index e945de9..96389d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -32,6 +33,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteCouldReconnectCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
@@ -39,6 +41,7 @@ import org.apache.ignite.events.CacheEvent;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -54,7 +57,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -65,7 +67,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -509,7 +511,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         catch (Throwable e) {
             U.error(log, "Failed to reinitialize local partitions (preloading will be stopped): " + exchId, e);
 
-            onDone(e);
+            if (cctx.localNode().isClient() && X.hasCause(e,
+                IOException.class,
+                IgniteClientDisconnectedCheckedException.class))
+                onDone(new IgniteCouldReconnectCheckedException("Local node could be reconnected. [locNodeId="
+                    + cctx.localNodeId() + ']', e));
+            else
+                onDone(e);
 
             if (e instanceof Error)
                 throw (Error)e;

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5a5f9d/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index d26242d..43918fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -1508,60 +1508,60 @@ public class GridServiceProcessor extends GridProcessorAdapter {
         }
     }
 
-        /**
-         * Deployment callback.
-         *
-         * @param dep Service deployment.
-         * @param topVer Topology version.
-         */
-        private void onDeployment(final GridServiceDeployment dep, final AffinityTopologyVersion topVer) {
-            // Retry forever.
-            try {
-                AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx();
+    /**
+     * Deployment callback.
+     *
+     * @param dep Service deployment.
+     * @param topVer Topology version.
+     */
+    private void onDeployment(final GridServiceDeployment dep, final AffinityTopologyVersion topVer) {
+        // Retry forever.
+        try {
+            AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx();
 
-                // If topology version changed, reassignment will happen from topology event.
-                if (newTopVer.equals(topVer))
-                    reassign(dep, topVer);
-            }
-            catch (IgniteCheckedException e) {
-                if (!(e instanceof ClusterTopologyCheckedException))
-                    log.error("Failed to do service reassignment (will retry): " + dep.configuration().getName(), e);
+            // If topology version changed, reassignment will happen from topology event.
+            if (newTopVer.equals(topVer))
+                reassign(dep, topVer);
+        }
+        catch (IgniteCheckedException e) {
+            if (!(e instanceof ClusterTopologyCheckedException))
+                log.error("Failed to do service reassignment (will retry): " + dep.configuration().getName(), e);
 
-                AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx();
+            AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx();
 
-                if (!newTopVer.equals(topVer)) {
-                    assert newTopVer.compareTo(topVer) > 0;
+            if (!newTopVer.equals(topVer)) {
+                assert newTopVer.compareTo(topVer) > 0;
 
-                    // Reassignment will happen from topology event.
-                    return;
-                }
+                // Reassignment will happen from topology event.
+                return;
+            }
 
-                ctx.timeout().addTimeoutObject(new GridTimeoutObject() {
-                    private IgniteUuid id = IgniteUuid.randomUuid();
+            ctx.timeout().addTimeoutObject(new GridTimeoutObject() {
+                private IgniteUuid id = IgniteUuid.randomUuid();
 
-                    private long start = System.currentTimeMillis();
+                private long start = System.currentTimeMillis();
 
-                    @Override public IgniteUuid timeoutId() {
-                        return id;
-                    }
+                @Override public IgniteUuid timeoutId() {
+                    return id;
+                }
 
-                    @Override public long endTime() {
-                        return start + RETRY_TIMEOUT;
-                    }
+                @Override public long endTime() {
+                    return start + RETRY_TIMEOUT;
+                }
 
-                    @Override public void onTimeout() {
-                        if (!busyLock.enterBusy())
-                            return;
+                @Override public void onTimeout() {
+                    if (!busyLock.enterBusy())
+                        return;
 
-                        try {
-                            // Try again.
-                            onDeployment(dep, topVer);
-                        }
-                        finally {
-                            busyLock.leaveBusy();
-                        }
+                    try {
+                        // Try again.
+                        onDeployment(dep, topVer);
                     }
-                });
+                    finally {
+                        busyLock.leaveBusy();
+                    }
+                }
+            });
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5a5f9d/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
index 8c23d92..9ee3777 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.spi.IgniteSpi;
 import org.apache.ignite.spi.IgniteSpiException;
@@ -128,6 +129,15 @@ public interface DiscoverySpi extends IgniteSpi {
     public void disconnect() throws IgniteSpiException;
 
     /**
+     * Leave cluster and try to join again.
+     *
+     * @return Future will be completed with success when node joined cluster
+     * or throw exception if failed.
+     * @throws IgniteSpiException If failed.
+     */
+    public IgniteInternalFuture<Object> rejoin() throws IgniteSpiException;
+
+    /**
      * Sets discovery SPI node authenticator. This method is called before SPI start() method.
      *
      * @param auth Discovery SPI authenticator.

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5a5f9d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 39c539c..bbbd4c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -53,6 +53,7 @@ import org.apache.ignite.cache.CacheMetrics;
 import org.apache.ignite.cluster.ClusterMetrics;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -127,6 +128,9 @@ class ClientImpl extends TcpDiscoveryImpl {
     /** */
     private static final Object SPI_RECONNECT_FAILED = "SPI_RECONNECT_FAILED";
 
+    /** */
+    private static final Object SPI_REJOIN = "SPI_REJOIN";
+
     /** Remote nodes. */
     private final ConcurrentMap<UUID, TcpDiscoveryNode> rmtNodes = new ConcurrentHashMap8<>();
 
@@ -163,6 +167,12 @@ class ClientImpl extends TcpDiscoveryImpl {
     /** */
     private final Timer timer = new Timer("TcpDiscoverySpi.timer");
 
+    /** Rejoin future. */
+    private GridFutureAdapter<Object> rejoinFut;
+
+    /** Rejoin mutex. */
+    private final Object rejoinMux = new Object();
+
     /** */
     protected MessageWorker msgWorker;
 
@@ -789,6 +799,24 @@ class ClientImpl extends TcpDiscoveryImpl {
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<Object> rejoin() throws IgniteSpiException {
+        synchronized (rejoinMux) {
+            if (rejoinFut != null)
+                return rejoinFut;
+            else {
+                if (log.isDebugEnabled())
+                    log.debug("Rejoining to cluster, [localNodeId=" + getLocalNodeId() + ']');
+
+                rejoinFut = new GridFutureAdapter<>();
+
+                msgWorker.addMessage(SPI_REJOIN);
+
+                return rejoinFut;
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public void brakeConnection() {
         SocketStream sockStream = msgWorker.currSock;
 
@@ -987,18 +1015,21 @@ class ClientImpl extends TcpDiscoveryImpl {
         private final Queue<TcpDiscoveryAbstractMessage> queue = new ArrayDeque<>();
 
         /** */
-        private final long socketTimeout;
+        private final long sockTimeout;
 
         /** */
         private TcpDiscoveryAbstractMessage unackedMsg;
 
+        /** */
+        private GridFutureAdapter<Object> forceFut;
+
         /**
          *
          */
         protected SocketWriter() {
             super(spi.ignite().name(), "tcp-client-disco-sock-writer", log);
 
-            socketTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
+            sockTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
                 spi.getSocketTimeout();
         }
 
@@ -1014,6 +1045,26 @@ class ClientImpl extends TcpDiscoveryImpl {
         }
 
         /**
+         *
+         */
+        private void forceLeave() {
+            synchronized (mux) {
+                forceFut = new GridFutureAdapter<>();
+
+                unackedMsg = null;
+
+                mux.notifyAll();
+            }
+
+            try {
+                forceFut.get();
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteSpiException(e);
+            }
+        }
+
+        /**
          * @param sock Socket.
          * @param clientAck {@code True} is server supports client message acknowlede.
          */
@@ -1069,12 +1120,19 @@ class ClientImpl extends TcpDiscoveryImpl {
                         continue;
                     }
 
-                    msg = queue.poll();
+                    if (forceFut != null) {
+                        msg = new TcpDiscoveryNodeLeftMessage(getLocalNodeId());
 
-                    if (msg == null) {
-                        mux.wait();
+                        msg.client(true);
+                    }
+                    else {
+                        msg = queue.poll();
 
-                        continue;
+                        if (msg == null) {
+                            mux.wait();
+
+                            continue;
+                        }
                     }
                 }
 
@@ -1095,7 +1153,10 @@ class ClientImpl extends TcpDiscoveryImpl {
                     spi.writeToSocket(
                         sock,
                         msg,
-                        socketTimeout);
+                        sockTimeout);
+
+                    if (forceFut != null)
+                        throw new IgniteCheckedException("Force fail local node.");
 
                     msg = null;
 
@@ -1137,7 +1198,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                         if (log.isDebugEnabled())
                             log.debug("Failed to send message, node is stopping [msg=" + msg + ", err=" + e + ']');
                     }
-                    else
+                    else if (forceFut == null)
                         U.error(log, "Failed to send message: " + msg, e);
 
                     U.closeQuiet(sock);
@@ -1145,6 +1206,15 @@ class ClientImpl extends TcpDiscoveryImpl {
                     synchronized (mux) {
                         if (sock == this.sock)
                             this.sock = null; // Connection has dead.
+
+                        if (forceFut != null) {
+                            queue.clear();
+                            unackedMsg = null;
+
+                            forceFut.onDone();
+
+                            forceFut = null;
+                        }
                     }
                 }
             }
@@ -1393,6 +1463,59 @@ class ClientImpl extends TcpDiscoveryImpl {
                         else
                             leaveLatch.countDown();
                     }
+                    else if (msg == SPI_REJOIN) {
+                        if (reconnector != null) {
+                            reconnector.cancel();
+                            reconnector.join();
+
+                            reconnector = null;
+                        }
+
+                        if (state == CONNECTED) {
+                            state = DISCONNECTED;
+
+                            nodeAdded = false;
+
+                            IgniteClientDisconnectedCheckedException err =
+                                new IgniteClientDisconnectedCheckedException(null, "Failed to ping node, " +
+                                    "client node disconnected.");
+
+                            for (Map.Entry<UUID, GridFutureAdapter<Boolean>> e : pingFuts.entrySet()) {
+                                GridFutureAdapter<Boolean> fut = e.getValue();
+
+                                if (pingFuts.remove(e.getKey(), fut))
+                                    fut.onDone(err);
+                            }
+
+                            sockWriter.forceLeave();
+                            sockReader.setSocket(null, null);
+                            currSock = null;
+
+                            notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes());
+                        }
+
+                        if (state == DISCONNECTED) {
+                            UUID newId = UUID.randomUUID();
+
+                            locNode.onClientDisconnected(newId);
+
+                            tryJoin();
+                        }
+
+                        synchronized (rejoinMux) {
+                            ClientImpl.State state0 = state;
+
+                            if (rejoinFut != null &&
+                                (state0 == STARTING || state0 == STOPPED || state0 == SEGMENTED)) {
+                                rejoinFut.onDone(new IgniteCheckedException(
+                                    "Cannot perform rejoin on incompatible state: " + state0));
+
+                                rejoinFut = null;
+                            }
+                            else
+                                assert rejoinFut != null : "Rejoin future cannot be null.";
+                        }
+                    }
                     else if (msg instanceof TcpDiscoveryNodeFailedMessage &&
                         ((TcpDiscoveryNodeFailedMessage)msg).failedNodeId().equals(locNode.id())) {
                         TcpDiscoveryNodeFailedMessage msg0 = (TcpDiscoveryNodeFailedMessage)msg;
@@ -1566,6 +1689,16 @@ class ClientImpl extends TcpDiscoveryImpl {
                     }
                 }
             }
+            catch (Throwable t) {
+                synchronized (rejoinMux) {
+                    if (rejoinFut != null)
+                        rejoinFut.onDone(t);
+
+                    rejoinFut = null;
+                }
+
+                throw t;
+            }
             finally {
                 SocketStream currSock = this.currSock;
 
@@ -1793,6 +1926,14 @@ class ClientImpl extends TcpDiscoveryImpl {
                     joinErr.set(null);
 
                     joinLatch.countDown();
+
+                    synchronized (rejoinMux) {
+                        if (rejoinFut != null) {
+                            rejoinFut.onDone();
+
+                            rejoinFut = null;
+                        }
+                    }
                 }
                 else if (log.isDebugEnabled())
                     log.debug("Discarding node add finished message (this message has already been processed) " +

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5a5f9d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 93978ac..a13a70f 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -65,6 +65,7 @@ import org.apache.ignite.cache.CacheMetrics;
 import org.apache.ignite.cluster.ClusterMetrics;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.IgnitionEx;
@@ -1591,6 +1592,11 @@ class ServerImpl extends TcpDiscoveryImpl {
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteInternalFuture rejoin() throws IgniteSpiException {
+        throw new UnsupportedOperationException("Rejoin is not supported for server.");
+    }
+
+    /** {@inheritDoc} */
     @Override protected IgniteSpiThread workerThread() {
         return msgWorker;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5a5f9d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index f199c20..a4ab772 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentLinkedDeque;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -259,6 +260,15 @@ abstract class TcpDiscoveryImpl {
     }
 
     /**
+     * Leave cluster and try to join again.
+     *
+     * @return Future will be completed with success when node joined cluster
+     * or throw exception if failed.
+     * @throws IgniteSpiException If failed.
+     */
+    public abstract IgniteInternalFuture<Object> rejoin() throws IgniteSpiException;
+
+    /**
      * <strong>FOR TEST ONLY!!!</strong>
      * <p>
      * Simulates this node failure by stopping service threads. So, node will become

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5a5f9d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 45933e1..e063d40 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -52,6 +52,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.AddressResolver;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.GridComponent;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.F;
@@ -1921,6 +1922,11 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
         return ignite().configuration().getSslContextFactory() != null;
     }
 
+    /** {@inheritDoc} */
+    public IgniteInternalFuture<Object> rejoin() throws IgniteSpiException {
+        return impl.rejoin();
+    }
+
     /**
      * <strong>FOR TEST ONLY!!!</strong>
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5a5f9d/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
new file mode 100644
index 0000000..d91bed6
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
+import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests client to be able restore connection to cluster if coordination is not available.
+ */
+public class IgniteClientRejoinTest extends GridCommonAbstractTest {
+    /** Block. */
+    private volatile boolean block;
+
+    /** Coordinator. */
+    private volatile ClusterNode crd;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        System.setProperty("IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK", "true");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        System.clearProperty("IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        if (gridName.contains("client")) {
+            cfg.setCommunicationSpi(new TcpCommunicationSpi());
+
+            TcpDiscoverySpi spi = (TcpDiscoverySpi)cfg.getDiscoverySpi();
+            DiscoverySpi dspi = new DiscoverySpi();
+
+            dspi.setIpFinder(spi.getIpFinder());
+
+            cfg.setDiscoverySpi(dspi);
+
+            cfg.setClientMode(true);
+        }
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnect() throws Exception {
+        Ignite srv1 = startGrid("server1");
+
+        crd = ((IgniteKernal)srv1).localNode();
+
+        Ignite srv2 = startGrid("server2");
+
+        block = true;
+
+        IgniteInternalFuture<Boolean> fut = GridTestUtils.runAsync(new Callable<Boolean>() {
+            @Override public Boolean call() throws Exception {
+                Random rnd = new Random();
+
+                U.sleep((rnd.nextInt(15) + 30) * 1000);
+
+                block = false;
+
+                System.out.println("ALLOW connection to coordinator.");
+
+                return true;
+            }
+        });
+
+        Ignite client = startGrid("client");
+
+
+        assert fut.get();
+
+        IgniteCache<Integer, Integer> cache = client.getOrCreateCache("some");
+
+        for (int i = 0; i < 100; i++)
+            cache.put(i, i);
+
+        for (int i = 0; i < 100; i++)
+            assert i == cache.get(i);
+
+        Collection<ClusterNode> clients = client.cluster().forClients().nodes();
+
+        assertEquals("Clients: " + clients, 1, clients.size());
+        assertEquals(1, srv1.cluster().forClients().nodes().size());
+        assertEquals(1, srv2.cluster().forClients().nodes().size());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testManyClientsReconnect() throws Exception {
+        Ignite srv1 = startGrid("server1");
+
+        crd = ((IgniteKernal)srv1).localNode();
+
+        Ignite srv2 = startGrid("server2");
+
+        block = true;
+
+        List<IgniteInternalFuture<Ignite>> futs = new ArrayList<>();
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        final int CLIENTS_NUM = 5;
+
+        for (int i = 0; i < CLIENTS_NUM; i++) {
+            final int idx = i;
+
+            IgniteInternalFuture<Ignite> fut = GridTestUtils.runAsync(new Callable<Ignite>() {
+                @Override public Ignite call() throws Exception {
+                    latch.await();
+
+                    return startGrid("client" + idx);
+                }
+            });
+
+            futs.add(fut);
+        }
+
+        GridTestUtils.runAsync(new Callable<Boolean>() {
+            @Override public Boolean call() throws Exception {
+                latch.countDown();
+
+                Random rnd = new Random();
+
+                U.sleep((rnd.nextInt(15) + 15) * 1000);
+
+                block = false;
+
+                System.out.println(">>> ALLOW connection to coordinator.");
+
+                return true;
+            }
+        });
+
+        for (IgniteInternalFuture<Ignite> clientFut : futs) {
+            Ignite client = clientFut.get();
+
+            IgniteCache<Integer, Integer> cache = client.getOrCreateCache(client.name());
+
+            for (int i = 0; i < 100; i++)
+                cache.put(i, i);
+
+            for (int i = 0; i < 100; i++)
+                assert i == cache.get(i);
+        }
+
+        assertEquals(CLIENTS_NUM, srv1.cluster().forClients().nodes().size());
+        assertEquals(CLIENTS_NUM, srv2.cluster().forClients().nodes().size());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 2 * 60_000;
+    }
+
+    /**
+     *
+     */
+    private class TcpCommunicationSpi extends org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi {
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
+            if (block && node.id().equals(crd.id()))
+                throw new IgniteSpiException(new SocketException("Test communication exception"));
+
+            super.sendMessage(node, msg);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg,
+            IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
+            if (block && node.id().equals(crd.id()))
+                throw new IgniteSpiException(new SocketException("Test communication exception"));
+
+            super.sendMessage(node, msg, ackC);
+        }
+    }
+
+    /**
+     *
+     */
+    private class DiscoverySpi extends TcpDiscoverySpi {
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, byte[] data,
+            long timeout) throws IOException {
+            if (block && sock.getPort() == 47500)
+                throw new SocketException("Test discovery exception");
+
+            super.writeToSocket(sock, msg, data, timeout);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
+            long timeout) throws IOException, IgniteCheckedException {
+            if (block && sock.getPort() == 47500)
+                throw new SocketException("Test discovery exception");
+
+            super.writeToSocket(sock, msg, timeout);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg,
+            long timeout) throws IOException, IgniteCheckedException {
+            if (block && sock.getPort() == 47500)
+                throw new SocketException("Test discovery exception");
+
+            super.writeToSocket(sock, out, msg, timeout);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res,
+            long timeout) throws IOException {
+            if (block && sock.getPort() == 47500)
+                throw new SocketException("Test discovery exception");
+
+            super.writeToSocket(msg, sock, res, timeout);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected Socket openSocket(Socket sock, InetSocketAddress remAddr,
+            IgniteSpiOperationTimeoutHelper timeoutHelper) throws IOException, IgniteSpiOperationTimeoutException {
+            if (block && sock.getPort() == 47500)
+                throw new SocketException("Test discovery exception");
+
+            return super.openSocket(sock, remAddr, timeoutHelper);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5a5f9d/modules/core/src/test/java/org/apache/ignite/internal/processors/cluster/GridUpdateNotifierSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cluster/GridUpdateNotifierSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cluster/GridUpdateNotifierSelfTest.java
index 21b91b6..4e3977c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cluster/GridUpdateNotifierSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cluster/GridUpdateNotifierSelfTest.java
@@ -133,6 +133,8 @@ public class GridUpdateNotifierSelfTest extends GridCommonAbstractTest {
             return null;
         }
 
+        @Override public void onReconnectFailed(Throwable t) {}
+
         @Override public void onReconnected() {
             // No-op.
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5a5f9d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
index ea8e37b..67d88e1 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.IgniteClientReconnectFailoverTest;
 import org.apache.ignite.internal.IgniteClientReconnectServicesTest;
 import org.apache.ignite.internal.IgniteClientReconnectStopTest;
 import org.apache.ignite.internal.IgniteClientReconnectStreamerTest;
+import org.apache.ignite.internal.IgniteClientRejoinTest;
 
 /**
  *
@@ -52,6 +53,7 @@ public class IgniteClientReconnectTestSuite extends TestSuite {
         suite.addTestSuite(IgniteClientReconnectServicesTest.class);
         suite.addTestSuite(IgniteClientReconnectStreamerTest.class);
         suite.addTestSuite(IgniteClientReconnectFailoverTest.class);
+        suite.addTestSuite(IgniteClientRejoinTest.class);
 
         return suite;
     }


Mime
View raw message