Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6FEA618107 for ; Wed, 9 Dec 2015 07:36:28 +0000 (UTC) Received: (qmail 2384 invoked by uid 500); 9 Dec 2015 07:36:28 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 2216 invoked by uid 500); 9 Dec 2015 07:36:28 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 889 invoked by uid 99); 9 Dec 2015 07:36:27 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Dec 2015 07:36:27 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8054CE08DD; Wed, 9 Dec 2015 07:36:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Wed, 09 Dec 2015 07:36:58 -0000 Message-Id: <80b2411486c34bcc89a2916376be1e1c@git.apache.org> In-Reply-To: <80de28b978cf43a99b96b55ff46d11dd@git.apache.org> References: <80de28b978cf43a99b96b55ff46d11dd@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [32/50] [abbrv] ignite git commit: ignite-1.5 Fixed hang on client reconnect (should not do blocking calls from reconnect callback) ignite-1.5 Fixed hang on client reconnect (should not do blocking calls from reconnect callback) Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d5791837 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d5791837 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d5791837 Branch: refs/heads/ignite-1.5.1 Commit: d5791837890a70e1777b86aab281245701afe1eb Parents: 3b26859 Author: sboikov Authored: Tue Dec 8 12:42:25 2015 +0300 Committer: sboikov Committed: Tue Dec 8 12:42:25 2015 +0300 ---------------------------------------------------------------------- .../apache/ignite/internal/GridComponent.java | 3 +- .../ignite/internal/GridPluginComponent.java | 4 +- .../apache/ignite/internal/IgniteKernal.java | 18 +++- .../internal/managers/GridManagerAdapter.java | 5 +- .../deployment/GridDeploymentManager.java | 5 +- .../processors/GridProcessorAdapter.java | 5 +- .../processors/cache/GridCacheContext.java | 6 +- .../processors/cache/GridCacheProcessor.java | 26 +++++- .../datastructures/DataStructuresProcessor.java | 4 +- .../IgniteClientReconnectAbstractTest.java | 95 +++++++++++++++++--- .../IgniteClientReconnectAtomicsTest.java | 57 ++++++++++++ .../IgniteClientReconnectCacheTest.java | 5 +- .../IgniteClientReconnectCollectionsTest.java | 51 +++++++++++ 13 files changed, 254 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d5791837/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java index 6078c5d..0e234cd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java @@ -131,6 +131,7 @@ public interface GridComponent { * * @param clusterRestarted Cluster restarted flag. * @throws IgniteCheckedException If failed. + * @return Future to wait before completing reconnect future. */ - public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException; + @Nullable public IgniteInternalFuture onReconnected(boolean clusterRestarted) throws IgniteCheckedException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/d5791837/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java index ac2a3a7..89dc243 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java @@ -70,8 +70,8 @@ public class GridPluginComponent implements GridComponent { } /** {@inheritDoc} */ - @Override public void onReconnected(boolean clusterRestarted) { - // No-op. + @Override public IgniteInternalFuture onReconnected(boolean clusterRestarted) { + return null; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/d5791837/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 87ccf93..ab62c13 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -136,6 +136,7 @@ import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor; import org.apache.ignite.internal.processors.task.GridTaskProcessor; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.GridTimerTask; +import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.future.IgniteFutureImpl; @@ -3083,16 +3084,27 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { /** * @param clusterRestarted {@code True} if all cluster nodes restarted while client was disconnected. */ + @SuppressWarnings("unchecked") public void onReconnected(final boolean clusterRestarted) { Throwable err = null; try { ctx.disconnected(false); - for (GridComponent comp : ctx.components()) - comp.onReconnected(clusterRestarted); + GridCompoundFuture reconnectFut = new GridCompoundFuture<>(); + + for (GridComponent comp : ctx.components()) { + IgniteInternalFuture fut = comp.onReconnected(clusterRestarted); + + if (fut != null) + reconnectFut.add((IgniteInternalFuture)fut); + } + + reconnectFut.add((IgniteInternalFuture)ctx.cache().context().exchange().reconnectExchangeFuture()); + + reconnectFut.markInitialized(); - ctx.cache().context().exchange().reconnectExchangeFuture().listen(new CI1>() { + reconnectFut.listen(new CI1>() { @Override public void apply(IgniteInternalFuture fut) { try { fut.get(); http://git-wip-us.apache.org/repos/asf/ignite/blob/d5791837/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java index 1fd5bff..21a80c3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java @@ -32,6 +32,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.Event; import org.apache.ignite.internal.GridComponent; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.timeout.GridSpiTimeoutObject; @@ -192,9 +193,11 @@ public abstract class GridManagerAdapter implements GridMan } /** {@inheritDoc} */ - @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException { + @Override public IgniteInternalFuture onReconnected(boolean clusterRestarted) throws IgniteCheckedException { for (T t : spis) t.onClientReconnected(clusterRestarted); + + return null; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/d5791837/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java index a2da75c..cea1786 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java @@ -27,6 +27,7 @@ import org.apache.ignite.compute.ComputeTask; import org.apache.ignite.compute.ComputeTaskName; import org.apache.ignite.configuration.DeploymentMode; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.managers.GridManagerAdapter; import org.apache.ignite.internal.managers.deployment.protocol.gg.GridProtocolHandler; import org.apache.ignite.internal.processors.task.GridInternal; @@ -123,8 +124,10 @@ public class GridDeploymentManager extends GridManagerAdapter { } /** {@inheritDoc} */ - @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException { + @Override public IgniteInternalFuture onReconnected(boolean clusterRestarted) throws IgniteCheckedException { storesOnKernalStart(); + + return null; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/d5791837/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java index f7f42bd..e4896fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java @@ -23,6 +23,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteFuture; @@ -68,8 +69,8 @@ public abstract class GridProcessorAdapter implements GridProcessor { } /** {@inheritDoc} */ - @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException { - // No-op. + @Override public IgniteInternalFuture onReconnected(boolean clusterRestarted) throws IgniteCheckedException { + return null; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/d5791837/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index d689ba6..07f6b9a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -242,7 +242,7 @@ public class GridCacheContext implements Externalizable { private boolean depEnabled; /** */ - private boolean deferredDelete; + private boolean deferredDel; /** * Empty constructor required for {@link Externalizable}. @@ -512,7 +512,7 @@ public class GridCacheContext implements Externalizable { public void cache(GridCacheAdapter cache) { this.cache = cache; - deferredDelete = cache.isDht() || cache.isDhtAtomic() || cache.isColocated() || + deferredDel = cache.isDht() || cache.isDhtAtomic() || cache.isColocated() || (cache.isNear() && cache.configuration().getAtomicityMode() == ATOMIC); } @@ -576,7 +576,7 @@ public class GridCacheContext implements Externalizable { * @return {@code True} if entries should not be deleted from cache immediately. */ public boolean deferredDelete() { - return deferredDelete; + return deferredDel; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/d5791837/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index e53f186..02e6403 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -96,6 +96,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManag import org.apache.ignite.internal.processors.plugin.CachePluginManager; import org.apache.ignite.internal.processors.query.GridQueryProcessor; import org.apache.ignite.internal.util.F0; +import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -955,10 +956,12 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** {@inheritDoc} */ - @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException { + @Override public IgniteInternalFuture onReconnected(boolean clusterRestarted) throws IgniteCheckedException { List reconnected = new ArrayList<>(caches.size()); - for (GridCacheAdapter cache : caches.values()) { + GridCompoundFuture stopFut = null; + + for (final GridCacheAdapter cache : caches.values()) { String name = cache.name(); boolean stopped; @@ -985,8 +988,17 @@ public class GridCacheProcessor extends GridProcessorAdapter { caches.remove(maskNull(cache.name())); jCacheProxies.remove(maskNull(cache.name())); - onKernalStop(cache, true); - stopCache(cache, true); + IgniteInternalFuture fut = ctx.closure().runLocalSafe(new Runnable() { + @Override public void run() { + onKernalStop(cache, true); + stopCache(cache, true); + } + }); + + if (stopFut == null) + stopFut = new GridCompoundFuture<>(); + + stopFut.add((IgniteInternalFuture)fut); } else { cache.onReconnected(); @@ -1008,6 +1020,11 @@ public class GridCacheProcessor extends GridProcessorAdapter { cache.context().gate().reconnected(false); cachesOnDisconnect = null; + + if (stopFut != null) + stopFut.markInitialized(); + + return stopFut; } /** @@ -1200,6 +1217,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param pluginMgr Cache plugin manager. * @param cacheType Cache type. * @param cacheObjCtx Cache object context. + * @param updatesAllowed Updates allowed flag. * @return Cache context. * @throws IgniteCheckedException If failed to create cache. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/d5791837/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index 9ed9350..51c4067 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -276,7 +276,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { } /** {@inheritDoc} */ - @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException { + @Override public IgniteInternalFuture onReconnected(boolean clusterRestarted) throws IgniteCheckedException { for (Map.Entry e : dsMap.entrySet()) { GridCacheRemovable obj = e.getValue(); @@ -291,6 +291,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { for (GridCacheContext cctx : ctx.cache().context().cacheContexts()) cctx.dataStructures().onReconnected(clusterRestarted); + + return null; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/d5791837/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java index 0c1df7f..180047a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java @@ -19,8 +19,10 @@ package org.apache.ignite.internal; import java.io.IOException; import java.net.Socket; +import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; @@ -99,6 +101,14 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra * @throws Exception If failed. */ protected void waitReconnectEvent(CountDownLatch latch) throws Exception { + waitReconnectEvent(log, latch); + } + + /** + * @param latch Latch. + * @throws Exception If failed. + */ + protected static void waitReconnectEvent(IgniteLogger log, CountDownLatch latch) throws Exception { if (!latch.await(RECONNECT_TIMEOUT, MILLISECONDS)) { log.error("Failed to wait for reconnect event, will dump threads, latch count: " + latch.getCount()); @@ -124,7 +134,7 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra * @param ignite Node. * @return Discovery SPI. */ - protected TestTcpDiscoverySpi spi(Ignite ignite) { + protected static TestTcpDiscoverySpi spi(Ignite ignite) { return ((TestTcpDiscoverySpi)ignite.configuration().getDiscoverySpi()); } @@ -201,18 +211,38 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra */ protected void reconnectClientNode(Ignite client, Ignite srv, @Nullable Runnable disconnectedC) throws Exception { - reconnectClientNodes(Collections.singletonList(client), srv, disconnectedC); + reconnectClientNodes(log, Collections.singletonList(client), srv, disconnectedC); } /** * Reconnect client node. * + * @param log Logger. + * @param client Client. + * @param srv Server. + * @param disconnectedC Closure which will be run when client node disconnected. + * @throws Exception If failed. + */ + public static void reconnectClientNode(IgniteLogger log, + Ignite client, + Ignite srv, + @Nullable Runnable disconnectedC) + throws Exception { + reconnectClientNodes(log, Collections.singletonList(client), srv, disconnectedC); + } + + /** + * Reconnect client node. + * + * @param log Logger. * @param clients Clients. * @param srv Server. * @param disconnectedC Closure which will be run when client node disconnected. * @throws Exception If failed. */ - protected void reconnectClientNodes(List clients, Ignite srv, @Nullable Runnable disconnectedC) + protected static void reconnectClientNodes(final IgniteLogger log, + List clients, Ignite srv, + @Nullable Runnable disconnectedC) throws Exception { final TestTcpDiscoverySpi srvSpi = spi(srv); @@ -227,12 +257,12 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra IgnitePredicate p = new IgnitePredicate() { @Override public boolean apply(Event evt) { if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { - info("Disconnected: " + evt); + log.info("Disconnected: " + evt); disconnectLatch.countDown(); } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { - info("Reconnected: " + evt); + log.info("Reconnected: " + evt); reconnectLatch.countDown(); } @@ -247,7 +277,7 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra for (Ignite client : clients) srvSpi.failNode(client.cluster().localNode().id(), null); - waitReconnectEvent(disconnectLatch); + waitReconnectEvent(log, disconnectLatch); if (disconnectedC != null) disconnectedC.run(); @@ -257,13 +287,58 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra for (Ignite client : clients) spi(client).writeLatch.countDown(); - waitReconnectEvent(reconnectLatch); + waitReconnectEvent(log, reconnectLatch); for (Ignite client : clients) client.events().stopLocalListen(p); } /** + * @param log Logger. + * @param client Client node. + * @param srvs Server nodes to stop. + * @param srvStartC Closure starting server nodes. + * @throws Exception If failed. + * @return Restarted servers. + */ + public static Collection reconnectServersRestart(final IgniteLogger log, + Ignite client, + Collection srvs, + Callable> srvStartC) + throws Exception { + final CountDownLatch disconnectLatch = new CountDownLatch(1); + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + client.events().localListen(new IgnitePredicate() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + log.info("Disconnected: " + evt); + + disconnectLatch.countDown(); + } + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + log.info("Reconnected: " + evt); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + for (Ignite srv : srvs) + srv.close(); + + assertTrue(disconnectLatch.await(30_000, MILLISECONDS)); + + Collection startedSrvs = srvStartC.call(); + + assertTrue(reconnectLatch.await(10_000, MILLISECONDS)); + + return startedSrvs; + } + + /** * @param e Client disconnected exception. * @return Reconnect future. */ @@ -303,7 +378,7 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra /** * */ - protected static class TestTcpDiscoverySpi extends TcpDiscoverySpi { + public static class TestTcpDiscoverySpi extends TcpDiscoverySpi { /** */ volatile CountDownLatch writeLatch; @@ -342,7 +417,7 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra private IgniteLogger log; /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure ackClosure) + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure ackC) throws IgniteSpiException { Class msgCls0 = msgCls; @@ -356,7 +431,7 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra return; } - super.sendMessage(node, msg, ackClosure); + super.sendMessage(node, msg, ackC); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/d5791837/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java index c46b5c8..13cac81 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal; +import java.util.Collection; +import java.util.Collections; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import org.apache.ignite.Ignite; @@ -47,6 +49,61 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr /** * @throws Exception If failed. */ + public void testAtomicsReconnectClusterRestart() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + final IgniteAtomicLong atomicLong = client.atomicLong("atomicLong", 1L, true); + final IgniteAtomicReference atomicRef = client.atomicReference("atomicRef", 1, true); + final IgniteAtomicStamped atomicStamped = client.atomicStamped("atomicStamped", 1, 1, true); + final IgniteCountDownLatch latch = client.countDownLatch("latch", 1, true, true); + final IgniteAtomicSequence seq = client.atomicSequence("seq", 1L, true); + + Ignite srv = grid(0); + + reconnectServersRestart(log, client, Collections.singleton(srv), new Callable>() { + @Override public Collection call() throws Exception { + return Collections.singleton((Ignite)startGrid(0)); + } + }); + + GridTestUtils.assertThrows(log, new Callable() { + @Override public Void call() throws Exception { + atomicStamped.compareAndSet(1, 1, 2, 2); + + return null; + } + }, IllegalStateException.class, null); + + GridTestUtils.assertThrows(log, new Callable() { + @Override public Void call() throws Exception { + atomicRef.compareAndSet(1, 2); + + return null; + } + }, IllegalStateException.class, null); + + GridTestUtils.assertThrows(log, new Callable() { + @Override public Void call() throws Exception { + atomicLong.incrementAndGet(); + + return null; + } + }, IllegalStateException.class, null); + + GridTestUtils.assertThrows(log, new Callable() { + @Override public Void call() throws Exception { + seq.getAndAdd(1L); + + return null; + } + }, IllegalStateException.class, null); + } + + /** + * @throws Exception If failed. + */ public void testAtomicSeqReconnect() throws Exception { Ignite client = grid(serverCount()); http://git-wip-us.apache.org/repos/asf/ignite/blob/d5791837/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java index 14a770a..05da0b2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java @@ -971,7 +971,8 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac info("Disconnected: " + evt); disconnectLatch.countDown(); - } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + } + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { info("Reconnected: " + evt); reconnectLatch.countDown(); @@ -1096,7 +1097,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac for (int iter = 0; iter < 3; iter++) { log.info("Iteration: " + iter); - reconnectClientNodes(clients, grid(0), null); + reconnectClientNodes(log, clients, grid(0), null); for (Ignite client : clients) { IgniteCache cache = client.cache(null); http://git-wip-us.apache.org/repos/asf/ignite/blob/d5791837/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java index f6f038d..100e8de 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal; +import java.util.Collection; +import java.util.Collections; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import org.apache.ignite.Ignite; @@ -49,6 +51,55 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA /** * @throws Exception If failed. */ + public void testCollectionsReconnectClusterRestart() throws Exception { + CollectionConfiguration colCfg = new CollectionConfiguration(); + + colCfg.setCacheMode(PARTITIONED); + colCfg.setAtomicityMode(TRANSACTIONAL); + + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + final IgniteQueue queue = client.queue("q", 0, colCfg); + final IgniteSet set = client.set("s", colCfg); + + Ignite srv = grid(0); + + reconnectServersRestart(log, client, Collections.singleton(srv), new Callable>() { + @Override public Collection call() throws Exception { + return Collections.singleton((Ignite)startGrid(0)); + } + }); + + GridTestUtils.assertThrows(log, new Callable() { + @Override public Void call() throws Exception { + queue.add(1); + + return null; + } + }, IllegalStateException.class, null); + + GridTestUtils.assertThrows(log, new Callable() { + @Override public Void call() throws Exception { + set.add(1); + + return null; + } + }, IllegalStateException.class, null); + + try (IgniteQueue queue2 = client.queue("q", 0, colCfg)) { + queue2.add(1); + } + + try (IgniteSet set2 = client.set("s", colCfg)) { + set2.add(1); + } + } + + /** + * @throws Exception If failed. + */ public void testQueueReconnect() throws Exception { CollectionConfiguration colCfg = new CollectionConfiguration();