Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 291BE18777 for ; Wed, 11 Nov 2015 00:10:17 +0000 (UTC) Received: (qmail 83109 invoked by uid 500); 11 Nov 2015 00:10:17 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 83066 invoked by uid 500); 11 Nov 2015 00:10:17 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 82879 invoked by uid 99); 11 Nov 2015 00:10:17 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Nov 2015 00:10:17 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E7670E0615; Wed, 11 Nov 2015 00:10:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: raulk@apache.org To: commits@ignite.apache.org Date: Wed, 11 Nov 2015 00:10:33 -0000 Message-Id: <43bf44f39268475aad8dfd1352729c1b@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [18/50] [abbrv] ignite git commit: ignite-1758 Fixed client reconnect issues ignite-1758 Fixed client reconnect issues Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/627510c3 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/627510c3 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/627510c3 Branch: refs/heads/ignite-1527 Commit: 627510c398e57bbe02d40cc83a970154030b5494 Parents: cb1a334 Author: sboikov Authored: Tue Nov 3 17:42:01 2015 +0300 Committer: Raul Kripalani Committed: Wed Nov 11 00:09:41 2015 +0000 ---------------------------------------------------------------------- .../GridDhtPartitionsExchangeFuture.java | 6 ++ .../ignite/spi/discovery/tcp/ServerImpl.java | 21 +++-- .../TcpDiscoveryClientReconnectMessage.java | 1 + .../messages/TcpDiscoveryDiscardMessage.java | 1 + ...gniteClientReconnectMassiveShutdownTest.java | 84 +++++++++++++------- .../tcp/TcpDiscoveryMultiThreadedTest.java | 5 +- 6 files changed, 79 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/627510c3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 77e47a7..cef38e8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -1447,6 +1447,12 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter top = new ArrayList<>(allNodes.size()); for (TcpDiscoveryNode n0 : allNodes) { - if (n0.internalOrder() != 0 && n0.internalOrder() < node.internalOrder()) + assert n0.internalOrder() > 0 : n0; + + if (n0.internalOrder() < node.internalOrder()) top.add(n0); } @@ -3239,6 +3241,9 @@ class ServerImpl extends TcpDiscoveryImpl { } } else { + if (isLocalNodeCoordinator()) + addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false)); + if (isLocNodeRouter) { ClientMessageWorker wrk = clientMsgWorkers.get(nodeId); @@ -3249,7 +3254,7 @@ class ServerImpl extends TcpDiscoveryImpl { locNodeId + ", clientNodeId=" + nodeId + ']'); } else { - if (ring.hasRemoteNodes() && !locNodeId.equals(msg.verifierNodeId())) + if (ring.hasRemoteNodes() && !isLocalNodeCoordinator()) sendMessageAcrossRing(msg); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/627510c3/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java index c232e6c..7c0cd5d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java @@ -26,6 +26,7 @@ import org.apache.ignite.lang.IgniteUuid; /** * Message telling that client node is reconnecting to topology. */ +@TcpDiscoveryEnsureDelivery public class TcpDiscoveryClientReconnectMessage extends TcpDiscoveryAbstractMessage { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/ignite/blob/627510c3/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java index 145f19e..4b4eb9c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java @@ -40,6 +40,7 @@ public class TcpDiscoveryDiscardMessage extends TcpDiscoveryAbstractMessage { * * @param creatorNodeId Creator node ID. * @param msgId Message ID. + * @param customMsgDiscard Flag indicating whether the ID to discard is for a custom message or not. */ public TcpDiscoveryDiscardMessage(UUID creatorNodeId, IgniteUuid msgId, boolean customMsgDiscard) { super(creatorNodeId); http://git-wip-us.apache.org/repos/asf/ignite/blob/627510c3/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientReconnectMassiveShutdownTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientReconnectMassiveShutdownTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientReconnectMassiveShutdownTest.java index 6f0e887..5282cf2 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientReconnectMassiveShutdownTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientReconnectMassiveShutdownTest.java @@ -22,6 +22,7 @@ import java.util.Random; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import javax.cache.CacheException; @@ -35,12 +36,14 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED; import static org.apache.ignite.cache.CacheMode.PARTITIONED; @@ -131,7 +134,7 @@ public class IgniteClientReconnectMassiveShutdownTest extends GridCommonAbstract assertTrue(client.configuration().isClientMode()); - CacheConfiguration cfg = new CacheConfiguration<>(); + final CacheConfiguration cfg = new CacheConfiguration<>(); cfg.setCacheMode(PARTITIONED); cfg.setAtomicityMode(TRANSACTIONAL); @@ -141,6 +144,8 @@ public class IgniteClientReconnectMassiveShutdownTest extends GridCommonAbstract IgniteCache cache = client.getOrCreateCache(cfg); + assertNotNull(cache); + HashMap put = new HashMap<>(); // Load some data. @@ -155,59 +160,80 @@ public class IgniteClientReconnectMassiveShutdownTest extends GridCommonAbstract for (int i = GRID_CNT; i < GRID_CNT + CLIENT_GRID_CNT; i++) clientIdx.add(i); + final CountDownLatch latch = new CountDownLatch(CLIENT_GRID_CNT); + IgniteInternalFuture clientsFut = multithreadedAsync( new Callable() { @Override public Object call() throws Exception { - int idx = clientIdx.take(); + try { + int idx = clientIdx.take(); - Ignite ignite = grid(idx); + Ignite ignite = grid(idx); - Thread.currentThread().setName("client-thread-" + ignite.name()); + Thread.currentThread().setName("client-thread-" + ignite.name()); - assertTrue(ignite.configuration().isClientMode()); + assertTrue(ignite.configuration().isClientMode()); - IgniteCache cache = ignite.cache(null); + IgniteCache cache = ignite.getOrCreateCache(cfg); - IgniteTransactions txs = ignite.transactions(); + assertNotNull(cache); - Random rand = new Random(); + IgniteTransactions txs = ignite.transactions(); - while (!done.get()) { - try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { - cache.put(String.valueOf(rand.nextInt(10_000)), rand.nextInt(50_000)); + Random rand = new Random(); - tx.commit(); - } - catch (ClusterTopologyException ex) { - ex.retryReadyFuture().get(); - } - catch (IgniteException | CacheException e) { - if (X.hasCause(e, IgniteClientDisconnectedException.class)) { - IgniteClientDisconnectedException cause = X.cause(e, - IgniteClientDisconnectedException.class); + latch.countDown(); - assert cause != null; + while (!done.get()) { + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(String.valueOf(rand.nextInt(10_000)), rand.nextInt(50_000)); - cause.reconnectFuture().get(); + tx.commit(); } - else if (X.hasCause(e, ClusterTopologyException.class)) { - ClusterTopologyException cause = X.cause(e, ClusterTopologyException.class); + catch (ClusterTopologyException ex) { + ex.retryReadyFuture().get(); + } + catch (IgniteException | CacheException e) { + if (X.hasCause(e, IgniteClientDisconnectedException.class)) { + IgniteClientDisconnectedException cause = X.cause(e, + IgniteClientDisconnectedException.class); + + assert cause != null; + + cause.reconnectFuture().get(); + } + else if (X.hasCause(e, ClusterTopologyException.class)) { + ClusterTopologyException cause = X.cause(e, ClusterTopologyException.class); - assert cause != null; + assert cause != null; - cause.retryReadyFuture().get(); + cause.retryReadyFuture().get(); + } + else + throw e; } - else - throw e; } + + return null; } + catch (Throwable e) { + log.error("Unexpected error: " + e, e); - return null; + throw e; + } } }, CLIENT_GRID_CNT, "client-thread"); try { + if (!latch.await(30, SECONDS)) { + log.warning("Failed to wait for for clients start."); + + U.dumpThreads(log); + + fail("Failed to wait for for clients start."); + } + // Killing a half of server nodes. final int srvsToKill = GRID_CNT / 2; http://git-wip-us.apache.org/repos/asf/ignite/blob/627510c3/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java index 09b3ef8..55474dc 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java @@ -37,6 +37,7 @@ import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.client.util.GridConcurrentHashSet; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; @@ -161,8 +162,6 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { * @throws Exception If any error occurs. */ public void testMultiThreadedClientsRestart() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-1123"); - final AtomicBoolean done = new AtomicBoolean(); try { @@ -271,6 +270,8 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { if (X.hasCause(e, IgniteClientDisconnectedCheckedException.class) || X.hasCause(e, IgniteClientDisconnectedException.class)) log.info("Client disconnected: " + e); + else if (X.hasCause(e, ClusterTopologyCheckedException.class)) + log.info("Client failed to start: " + e); else { if (failedNodes.contains(id) && X.hasCause(e, IgniteSpiException.class)) log.info("Client failed: " + e);