ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [28/51] [abbrv] ignite git commit: ignite-1.5 Fixed hang on client reconnect (should not do blocking calls from reconnect callback)
Date Wed, 09 Dec 2015 11:24:10 GMT
ignite-1.5 Fixed hang on client reconnect (should not do blocking calls from reconnect callback)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d5791837
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d5791837
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d5791837

Branch: refs/heads/ignite-843-rc2
Commit: d5791837890a70e1777b86aab281245701afe1eb
Parents: 3b26859
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Dec 8 12:42:25 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Dec 8 12:42:25 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/GridComponent.java   |  3 +-
 .../ignite/internal/GridPluginComponent.java    |  4 +-
 .../apache/ignite/internal/IgniteKernal.java    | 18 +++-
 .../internal/managers/GridManagerAdapter.java   |  5 +-
 .../deployment/GridDeploymentManager.java       |  5 +-
 .../processors/GridProcessorAdapter.java        |  5 +-
 .../processors/cache/GridCacheContext.java      |  6 +-
 .../processors/cache/GridCacheProcessor.java    | 26 +++++-
 .../datastructures/DataStructuresProcessor.java |  4 +-
 .../IgniteClientReconnectAbstractTest.java      | 95 +++++++++++++++++---
 .../IgniteClientReconnectAtomicsTest.java       | 57 ++++++++++++
 .../IgniteClientReconnectCacheTest.java         |  5 +-
 .../IgniteClientReconnectCollectionsTest.java   | 51 +++++++++++
 13 files changed, 254 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d5791837/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
index 6078c5d..0e234cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
@@ -131,6 +131,7 @@ public interface GridComponent {
      *
      * @param clusterRestarted Cluster restarted flag.
      * @throws IgniteCheckedException If failed.
+     * @return Future to wait before completing reconnect future.
      */
-    public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException;
+    @Nullable public IgniteInternalFuture<?> onReconnected(boolean clusterRestarted)
throws IgniteCheckedException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d5791837/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
index ac2a3a7..89dc243 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
@@ -70,8 +70,8 @@ public class GridPluginComponent implements GridComponent {
     }
 
     /** {@inheritDoc} */
-    @Override public void onReconnected(boolean clusterRestarted) {
-        // No-op.
+    @Override public IgniteInternalFuture<?> onReconnected(boolean clusterRestarted)
{
+        return null;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d5791837/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 87ccf93..ab62c13 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -136,6 +136,7 @@ import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor;
 import org.apache.ignite.internal.processors.task.GridTaskProcessor;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.GridTimerTask;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.future.IgniteFutureImpl;
@@ -3083,16 +3084,27 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable
{
     /**
      * @param clusterRestarted {@code True} if all cluster nodes restarted while client was
disconnected.
      */
+    @SuppressWarnings("unchecked")
     public void onReconnected(final boolean clusterRestarted) {
         Throwable err = null;
 
         try {
             ctx.disconnected(false);
 
-            for (GridComponent comp : ctx.components())
-                comp.onReconnected(clusterRestarted);
+            GridCompoundFuture<?, ?> reconnectFut = new GridCompoundFuture<>();
+
+            for (GridComponent comp : ctx.components()) {
+                IgniteInternalFuture<?> fut = comp.onReconnected(clusterRestarted);
+
+                if (fut != null)
+                    reconnectFut.add((IgniteInternalFuture)fut);
+            }
+
+            reconnectFut.add((IgniteInternalFuture)ctx.cache().context().exchange().reconnectExchangeFuture());
+
+            reconnectFut.markInitialized();
 
-            ctx.cache().context().exchange().reconnectExchangeFuture().listen(new CI1<IgniteInternalFuture<?>>()
{
+            reconnectFut.listen(new CI1<IgniteInternalFuture<?>>() {
                 @Override public void apply(IgniteInternalFuture<?> fut) {
                     try {
                         fut.get();

http://git-wip-us.apache.org/repos/asf/ignite/blob/d5791837/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index 1fd5bff..21a80c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -32,6 +32,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.GridComponent;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.timeout.GridSpiTimeoutObject;
@@ -192,9 +193,11 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements
GridMan
     }
 
     /** {@inheritDoc} */
-    @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException
{
+    @Override public IgniteInternalFuture<?> onReconnected(boolean clusterRestarted)
throws IgniteCheckedException {
         for (T t : spis)
             t.onClientReconnected(clusterRestarted);
+
+        return null;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/d5791837/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java
index a2da75c..cea1786 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java
@@ -27,6 +27,7 @@ import org.apache.ignite.compute.ComputeTask;
 import org.apache.ignite.compute.ComputeTaskName;
 import org.apache.ignite.configuration.DeploymentMode;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.managers.GridManagerAdapter;
 import org.apache.ignite.internal.managers.deployment.protocol.gg.GridProtocolHandler;
 import org.apache.ignite.internal.processors.task.GridInternal;
@@ -123,8 +124,10 @@ public class GridDeploymentManager extends GridManagerAdapter<DeploymentSpi>
{
     }
 
     /** {@inheritDoc} */
-    @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException
{
+    @Override public IgniteInternalFuture<?> onReconnected(boolean clusterRestarted)
throws IgniteCheckedException {
         storesOnKernalStart();
+
+        return null;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d5791837/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
index f7f42bd..e4896fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
@@ -23,6 +23,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteFuture;
@@ -68,8 +69,8 @@ public abstract class GridProcessorAdapter implements GridProcessor {
     }
 
     /** {@inheritDoc} */
-    @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException
{
-        // No-op.
+    @Override public IgniteInternalFuture<?> onReconnected(boolean clusterRestarted)
throws IgniteCheckedException {
+        return null;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d5791837/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index d689ba6..07f6b9a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -242,7 +242,7 @@ public class GridCacheContext<K, V> implements Externalizable {
     private boolean depEnabled;
 
     /** */
-    private boolean deferredDelete;
+    private boolean deferredDel;
 
     /**
      * Empty constructor required for {@link Externalizable}.
@@ -512,7 +512,7 @@ public class GridCacheContext<K, V> implements Externalizable {
     public void cache(GridCacheAdapter<K, V> cache) {
         this.cache = cache;
 
-        deferredDelete = cache.isDht() || cache.isDhtAtomic() || cache.isColocated() ||
+        deferredDel = cache.isDht() || cache.isDhtAtomic() || cache.isColocated() ||
             (cache.isNear() && cache.configuration().getAtomicityMode() == ATOMIC);
     }
 
@@ -576,7 +576,7 @@ public class GridCacheContext<K, V> implements Externalizable {
      * @return {@code True} if entries should not be deleted from cache immediately.
      */
     public boolean deferredDelete() {
-        return deferredDelete;
+        return deferredDel;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/d5791837/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index e53f186..02e6403 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -96,6 +96,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManag
 import org.apache.ignite.internal.processors.plugin.CachePluginManager;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.util.F0;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -955,10 +956,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException
{
+    @Override public IgniteInternalFuture<?> onReconnected(boolean clusterRestarted)
throws IgniteCheckedException {
         List<GridCacheAdapter> reconnected = new ArrayList<>(caches.size());
 
-        for (GridCacheAdapter cache : caches.values()) {
+        GridCompoundFuture<?, ?> stopFut = null;
+
+        for (final GridCacheAdapter cache : caches.values()) {
             String name = cache.name();
 
             boolean stopped;
@@ -985,8 +988,17 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 caches.remove(maskNull(cache.name()));
                 jCacheProxies.remove(maskNull(cache.name()));
 
-                onKernalStop(cache, true);
-                stopCache(cache, true);
+                IgniteInternalFuture<?> fut = ctx.closure().runLocalSafe(new Runnable()
{
+                    @Override public void run() {
+                        onKernalStop(cache, true);
+                        stopCache(cache, true);
+                    }
+                });
+
+                if (stopFut == null)
+                    stopFut = new GridCompoundFuture<>();
+
+                stopFut.add((IgniteInternalFuture)fut);
             }
             else {
                 cache.onReconnected();
@@ -1008,6 +1020,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             cache.context().gate().reconnected(false);
 
         cachesOnDisconnect = null;
+
+        if (stopFut != null)
+            stopFut.markInitialized();
+
+        return stopFut;
     }
 
     /**
@@ -1200,6 +1217,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @param pluginMgr Cache plugin manager.
      * @param cacheType Cache type.
      * @param cacheObjCtx Cache object context.
+     * @param updatesAllowed Updates allowed flag.
      * @return Cache context.
      * @throws IgniteCheckedException If failed to create cache.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d5791837/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 9ed9350..51c4067 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -276,7 +276,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter
{
     }
 
     /** {@inheritDoc} */
-    @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException
{
+    @Override public IgniteInternalFuture<?> onReconnected(boolean clusterRestarted)
throws IgniteCheckedException {
         for (Map.Entry<GridCacheInternal, GridCacheRemovable> e : dsMap.entrySet())
{
             GridCacheRemovable obj = e.getValue();
 
@@ -291,6 +291,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter
{
 
         for (GridCacheContext cctx : ctx.cache().context().cacheContexts())
             cctx.dataStructures().onReconnected(clusterRestarted);
+
+        return null;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/d5791837/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
index 0c1df7f..180047a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
@@ -19,8 +19,10 @@ package org.apache.ignite.internal;
 
 import java.io.IOException;
 import java.net.Socket;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -99,6 +101,14 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
      * @throws Exception If failed.
      */
     protected void waitReconnectEvent(CountDownLatch latch) throws Exception {
+        waitReconnectEvent(log, latch);
+    }
+
+    /**
+     * @param latch Latch.
+     * @throws Exception If failed.
+     */
+    protected static void waitReconnectEvent(IgniteLogger log, CountDownLatch latch) throws
Exception {
         if (!latch.await(RECONNECT_TIMEOUT, MILLISECONDS)) {
             log.error("Failed to wait for reconnect event, will dump threads, latch count:
" + latch.getCount());
 
@@ -124,7 +134,7 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
      * @param ignite Node.
      * @return Discovery SPI.
      */
-    protected TestTcpDiscoverySpi spi(Ignite ignite) {
+    protected static TestTcpDiscoverySpi spi(Ignite ignite) {
         return ((TestTcpDiscoverySpi)ignite.configuration().getDiscoverySpi());
     }
 
@@ -201,18 +211,38 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
      */
     protected void reconnectClientNode(Ignite client, Ignite srv, @Nullable Runnable disconnectedC)
         throws Exception {
-        reconnectClientNodes(Collections.singletonList(client), srv, disconnectedC);
+        reconnectClientNodes(log, Collections.singletonList(client), srv, disconnectedC);
     }
 
     /**
      * Reconnect client node.
      *
+     * @param log  Logger.
+     * @param client Client.
+     * @param srv Server.
+     * @param disconnectedC Closure which will be run when client node disconnected.
+     * @throws Exception If failed.
+     */
+    public static void reconnectClientNode(IgniteLogger log,
+        Ignite client,
+        Ignite srv,
+        @Nullable Runnable disconnectedC)
+        throws Exception {
+        reconnectClientNodes(log, Collections.singletonList(client), srv, disconnectedC);
+    }
+
+    /**
+     * Reconnect client node.
+     *
+     * @param log  Logger.
      * @param clients Clients.
      * @param srv Server.
      * @param disconnectedC Closure which will be run when client node disconnected.
      * @throws Exception If failed.
      */
-    protected void reconnectClientNodes(List<Ignite> clients, Ignite srv, @Nullable
Runnable disconnectedC)
+    protected static void reconnectClientNodes(final IgniteLogger log,
+        List<Ignite> clients, Ignite srv,
+        @Nullable Runnable disconnectedC)
         throws Exception {
         final TestTcpDiscoverySpi srvSpi = spi(srv);
 
@@ -227,12 +257,12 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
         IgnitePredicate<Event> p = new IgnitePredicate<Event>() {
             @Override public boolean apply(Event evt) {
                 if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
-                    info("Disconnected: " + evt);
+                    log.info("Disconnected: " + evt);
 
                     disconnectLatch.countDown();
                 }
                 else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
-                    info("Reconnected: " + evt);
+                    log.info("Reconnected: " + evt);
 
                     reconnectLatch.countDown();
                 }
@@ -247,7 +277,7 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
         for (Ignite client : clients)
             srvSpi.failNode(client.cluster().localNode().id(), null);
 
-        waitReconnectEvent(disconnectLatch);
+        waitReconnectEvent(log, disconnectLatch);
 
         if (disconnectedC != null)
             disconnectedC.run();
@@ -257,13 +287,58 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
         for (Ignite client : clients)
             spi(client).writeLatch.countDown();
 
-        waitReconnectEvent(reconnectLatch);
+        waitReconnectEvent(log, reconnectLatch);
 
         for (Ignite client : clients)
             client.events().stopLocalListen(p);
     }
 
     /**
+     * @param log Logger.
+     * @param client Client node.
+     * @param srvs Server nodes to stop.
+     * @param srvStartC Closure starting server nodes.
+     * @throws Exception If failed.
+     * @return Restarted servers.
+     */
+    public static Collection<Ignite> reconnectServersRestart(final IgniteLogger log,
+        Ignite client,
+        Collection<Ignite> srvs,
+        Callable<Collection<Ignite>> srvStartC)
+        throws Exception {
+        final CountDownLatch disconnectLatch = new CountDownLatch(1);
+        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+                    log.info("Disconnected: " + evt);
+
+                    disconnectLatch.countDown();
+                }
+                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    log.info("Reconnected: " + evt);
+
+                    reconnectLatch.countDown();
+                }
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+        for (Ignite srv : srvs)
+            srv.close();
+
+        assertTrue(disconnectLatch.await(30_000, MILLISECONDS));
+
+        Collection<Ignite> startedSrvs = srvStartC.call();
+
+        assertTrue(reconnectLatch.await(10_000, MILLISECONDS));
+
+        return startedSrvs;
+    }
+
+    /**
      * @param e Client disconnected exception.
      * @return Reconnect future.
      */
@@ -303,7 +378,7 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
     /**
      *
      */
-    protected static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+    public static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
         /** */
         volatile CountDownLatch writeLatch;
 
@@ -342,7 +417,7 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
         private IgniteLogger log;
 
         /** {@inheritDoc} */
-        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException>
ackClosure)
+        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException>
ackC)
             throws IgniteSpiException {
             Class msgCls0 = msgCls;
 
@@ -356,7 +431,7 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
                 return;
             }
 
-            super.sendMessage(node, msg, ackClosure);
+            super.sendMessage(node, msg, ackC);
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/d5791837/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
index c46b5c8..13cac81 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal;
 
+import java.util.Collection;
+import java.util.Collections;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.Ignite;
@@ -47,6 +49,61 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
     /**
      * @throws Exception If failed.
      */
+    public void testAtomicsReconnectClusterRestart() throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        final IgniteAtomicLong atomicLong = client.atomicLong("atomicLong", 1L, true);
+        final IgniteAtomicReference<Integer> atomicRef = client.atomicReference("atomicRef",
1, true);
+        final IgniteAtomicStamped<Integer, Integer> atomicStamped = client.atomicStamped("atomicStamped",
1, 1, true);
+        final IgniteCountDownLatch latch = client.countDownLatch("latch", 1, true, true);
+        final IgniteAtomicSequence seq = client.atomicSequence("seq", 1L, true);
+
+        Ignite srv = grid(0);
+
+        reconnectServersRestart(log, client, Collections.singleton(srv), new Callable<Collection<Ignite>>()
{
+            @Override public Collection<Ignite> call() throws Exception {
+                return Collections.singleton((Ignite)startGrid(0));
+            }
+        });
+
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                atomicStamped.compareAndSet(1, 1, 2, 2);
+
+                return null;
+            }
+        }, IllegalStateException.class, null);
+
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                atomicRef.compareAndSet(1, 2);
+
+                return null;
+            }
+        }, IllegalStateException.class, null);
+
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                atomicLong.incrementAndGet();
+
+                return null;
+            }
+        }, IllegalStateException.class, null);
+
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                seq.getAndAdd(1L);
+
+                return null;
+            }
+        }, IllegalStateException.class, null);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testAtomicSeqReconnect() throws Exception {
         Ignite client = grid(serverCount());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d5791837/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
index 14a770a..05da0b2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@ -971,7 +971,8 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
                     info("Disconnected: " + evt);
 
                     disconnectLatch.countDown();
-                } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                }
+                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
                     info("Reconnected: " + evt);
 
                     reconnectLatch.countDown();
@@ -1096,7 +1097,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
         for (int iter = 0; iter < 3; iter++) {
             log.info("Iteration: " + iter);
 
-            reconnectClientNodes(clients, grid(0), null);
+            reconnectClientNodes(log, clients, grid(0), null);
 
             for (Ignite client : clients) {
                 IgniteCache<Object, Object> cache = client.cache(null);

http://git-wip-us.apache.org/repos/asf/ignite/blob/d5791837/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
index f6f038d..100e8de 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal;
 
+import java.util.Collection;
+import java.util.Collections;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.Ignite;
@@ -49,6 +51,55 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA
     /**
      * @throws Exception If failed.
      */
+    public void testCollectionsReconnectClusterRestart() throws Exception {
+        CollectionConfiguration colCfg = new CollectionConfiguration();
+
+        colCfg.setCacheMode(PARTITIONED);
+        colCfg.setAtomicityMode(TRANSACTIONAL);
+
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        final IgniteQueue<Object> queue = client.queue("q", 0, colCfg);
+        final IgniteSet<Object> set = client.set("s", colCfg);
+
+        Ignite srv = grid(0);
+
+        reconnectServersRestart(log, client, Collections.singleton(srv), new Callable<Collection<Ignite>>()
{
+            @Override public Collection<Ignite> call() throws Exception {
+                return Collections.singleton((Ignite)startGrid(0));
+            }
+        });
+
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                queue.add(1);
+
+                return null;
+            }
+        }, IllegalStateException.class, null);
+
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                set.add(1);
+
+                return null;
+            }
+        }, IllegalStateException.class, null);
+
+        try (IgniteQueue<Object> queue2 = client.queue("q", 0, colCfg)) {
+            queue2.add(1);
+        }
+
+        try (IgniteSet<Object> set2 = client.set("s", colCfg)) {
+            set2.add(1);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testQueueReconnect() throws Exception {
         CollectionConfiguration colCfg = new CollectionConfiguration();
 


Mime
View raw message