Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7A28F18818 for ; Tue, 10 Nov 2015 13:57:43 +0000 (UTC) Received: (qmail 76034 invoked by uid 500); 10 Nov 2015 13:57:43 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 75998 invoked by uid 500); 10 Nov 2015 13:57:43 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 75989 invoked by uid 99); 10 Nov 2015 13:57:43 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 10 Nov 2015 13:57:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 25B6ADFD86; Tue, 10 Nov 2015 13:57:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: ignite-1758 Discovery fixes. Date: Tue, 10 Nov 2015 13:57:43 +0000 (UTC) Repository: ignite Updated Branches: refs/heads/ignite-1758-debug e7723fe16 -> bf9cf422b ignite-1758 Discovery fixes. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bf9cf422 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bf9cf422 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bf9cf422 Branch: refs/heads/ignite-1758-debug Commit: bf9cf422b0384af44f94afdbd97cf5cda84e1f5c Parents: e7723fe Author: sboikov Authored: Tue Nov 10 16:17:07 2015 +0300 Committer: sboikov Committed: Tue Nov 10 16:34:39 2015 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ServerImpl.java | 94 +++++- .../tcp/internal/TcpDiscoveryNodesRing.java | 2 +- .../messages/TcpDiscoveryAbstractMessage.java | 31 ++ .../tcp/TcpDiscoveryMultiThreadedTest.java | 156 ++++++---- .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 293 ++++++++++++++++++- .../testframework/junits/GridAbstractTest.java | 15 +- 6 files changed, 502 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/bf9cf422/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 0fe2881..0233435 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -191,10 +191,10 @@ class ServerImpl extends TcpDiscoveryImpl { private StatisticsPrinter statsPrinter; /** Failed nodes (but still in topology). */ - private Collection failedNodes = new HashSet<>(); + private final Collection failedNodes = new HashSet<>(); /** Leaving nodes (but still in topology). */ - private Collection leavingNodes = new HashSet<>(); + private final Collection leavingNodes = new HashSet<>(); /** If non-shared IP finder is used this flag shows whether IP finder contains local address. */ private boolean ipFinderHasLocAddr; @@ -1080,9 +1080,17 @@ class ServerImpl extends TcpDiscoveryImpl { openSock = true; + TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(locNodeId); + + if (msg instanceof TcpDiscoveryJoinRequestMessage) { + synchronized (failedNodes) { + for (TcpDiscoveryNode node : failedNodes) + req.addFailedNode(node); + } + } + // Handshake. - spi.writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId), timeoutHelper.nextTimeoutChunk( - spi.getSocketTimeout())); + spi.writeToSocket(sock, req, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, timeoutHelper.nextTimeoutChunk( ackTimeout0)); @@ -1754,6 +1762,25 @@ class ServerImpl extends TcpDiscoveryImpl { } /** + * Adds failed nodes specified in the received message to the local failed nodes list. + * + * @param msg Message. + */ + private void processMessageFailedNodes(TcpDiscoveryAbstractMessage msg) { + if (msg.failedNodes() != null) { + for (UUID nodeId : msg.failedNodes()) { + TcpDiscoveryNode failedNode = ring.node(nodeId); + + if (failedNode != null) { + synchronized (mux) { + failedNodes.add(failedNode); + } + } + } + } + } + + /** * Discovery messages history used for client reconnect. */ private class EnsuredMessageHistory { @@ -2135,6 +2162,8 @@ class ServerImpl extends TcpDiscoveryImpl { spi.stats.onMessageProcessingStarted(msg); + processMessageFailedNodes(msg); + if (msg instanceof TcpDiscoveryJoinRequestMessage) processJoinRequestMessage((TcpDiscoveryJoinRequestMessage)msg); @@ -2200,6 +2229,8 @@ class ServerImpl extends TcpDiscoveryImpl { checkHeartbeatsReceiving(); checkPendingCustomMessages(); + + checkFailedNodesList(); } /** @@ -2540,6 +2571,11 @@ class ServerImpl extends TcpDiscoveryImpl { if (timeoutHelper == null) timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi); + if (!failedNodes.isEmpty()) { + for (TcpDiscoveryNode node : failedNodes) + msg.addFailedNode(node); + } + writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp); @@ -2679,11 +2715,11 @@ class ServerImpl extends TcpDiscoveryImpl { if (log.isDebugEnabled()) log.debug("Pending message has been sent to local node [msg=" + msg.id() + - ", pendingMsgId=" + pendingMsg + ", next=" + next.id() + ']'); + ", pendingMsgId=" + pendingMsg + ", next=" + (next != null ? next.id() : null) + ']'); if (debugMode) debugLog("Pending message has been sent to local node [msg=" + msg.id() + - ", pendingMsgId=" + pendingMsg + ", next=" + next.id() + ']'); + ", pendingMsgId=" + pendingMsg + ", next=" + (next != null ? next.id() : null) + ']'); } } @@ -3447,6 +3483,9 @@ class ServerImpl extends TcpDiscoveryImpl { spi.gridStartTime = msg.gridStartTime(); for (TcpDiscoveryNode n : top) { + assert n.internalOrder() < node.internalOrder() : + "Invalid node [topNode=" + n + ", added=" + node + ']'; + // Make all preceding nodes and local node visible. n.visible(true); } @@ -3500,6 +3539,8 @@ class ServerImpl extends TcpDiscoveryImpl { for (Map.Entry> entry : dataMap.entrySet()) spi.onExchange(node.id(), entry.getKey(), entry.getValue(), U.gridClassLoader()); } + + processMessageFailedNodes(msg); } if (sendMessageToRemotes(msg)) @@ -4053,7 +4094,7 @@ class ServerImpl extends TcpDiscoveryImpl { onException("Failed to respond to status check message (connection refused) " + "[recipient=" + msg.creatorNodeId() + ", status=" + msg.status() + ']', e); } - else { + else if (!spi.isNodeStopping0()){ if (pingNode(msg.creatorNode())) // Node exists and accepts incoming connections. U.error(log, "Failed to respond to status check message [recipient=" + @@ -4441,6 +4482,34 @@ class ServerImpl extends TcpDiscoveryImpl { } /** + * Checks failed nodes list and sends {@link TcpDiscoveryNodeFailedMessage} if failed node + * is still in the ring. + */ + private void checkFailedNodesList() { + List msgs = null; + + synchronized (mux) { + for (Iterator it = failedNodes.iterator(); it.hasNext();) { + TcpDiscoveryNode node = it.next(); + + if (ring.node(node.id()) != null) { + if (msgs == null) + msgs = new ArrayList<>(failedNodes.size()); + + msgs.add(new TcpDiscoveryNodeFailedMessage(getLocalNodeId(), node.id(), node.internalOrder())); + } + else + it.remove(); + } + } + + if (msgs != null) { + for (TcpDiscoveryNodeFailedMessage msg : msgs) + addMessage(msg); + } + } + + /** * Checks and flushes custom event messages if no nodes are attempting to join the grid. */ private void checkPendingCustomMessages() { @@ -4640,10 +4709,10 @@ class ServerImpl extends TcpDiscoveryImpl { synchronized (mux) { readers.add(reader); - - reader.start(); } + reader.start(); + spi.stats.onServerSocketInitialized(U.currentTimeMillis() - tstamp); } } @@ -4792,6 +4861,13 @@ class ServerImpl extends TcpDiscoveryImpl { // Handshake. TcpDiscoveryHandshakeRequest req = (TcpDiscoveryHandshakeRequest)msg; + if (req.failedNodes() != null && req.failedNodes().contains(getLocalNodeId())) { + if (log.isDebugEnabled()) + log.debug("Ignore handshake request, local node is in failed list: " + req); + + return; + } + UUID nodeId = req.creatorNodeId(); this.nodeId = nodeId; http://git-wip-us.apache.org/repos/asf/ignite/blob/bf9cf422/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java index 7ca092c..b234f40 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java @@ -451,7 +451,7 @@ public class TcpDiscoveryNodesRing { * topology contains less than two nodes. */ @Nullable public TcpDiscoveryNode nextNode(@Nullable Collection excluded) { - assert excluded == null || excluded.isEmpty() || !excluded.contains(locNode); + assert excluded == null || excluded.isEmpty() || !excluded.contains(locNode) : excluded; rwLock.readLock().lock(); http://git-wip-us.apache.org/repos/asf/ignite/blob/bf9cf422/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java index 875d18e..a2e48fb 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java @@ -19,10 +19,16 @@ package org.apache.ignite.spi.discovery.tcp.messages; import java.io.Externalizable; import java.io.Serializable; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; import java.util.UUID; import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; +import org.jetbrains.annotations.Nullable; /** * Base class to implement discovery messages. @@ -62,6 +68,10 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable { /** Pending message index. */ private short pendingIdx; + /** */ + @GridToStringInclude + private Set failedNodes; + /** * Default no-arg constructor for {@link Externalizable} interface. */ @@ -236,6 +246,27 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable { return false; } + /** + * Adds node ID to the failed nodes list. + * + * @param node Node. + */ + public void addFailedNode(TcpDiscoveryNode node) { + assert node != null; + + if (failedNodes == null) + failedNodes = new HashSet<>(); + + failedNodes.add(node.id()); + } + + /** + * @return Failed nodes IDs. + */ + @Nullable public Collection failedNodes() { + return failedNodes; + } + /** {@inheritDoc} */ @Override public final boolean equals(Object obj) { if (this == obj) http://git-wip-us.apache.org/repos/asf/ignite/blob/bf9cf422/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java index 55474dc..5a01121 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.DiscoveryEvent; @@ -212,6 +213,22 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { public void testMultiThreadedClientsServersRestart() throws Throwable { fail("https://issues.apache.org/jira/browse/IGNITE-1123"); + multiThreadedClientsServersRestart(GRID_CNT, CLIENT_GRID_CNT); + } + + /** + * @throws Exception If any error occurs. + */ + public void _testMultiThreadedServersRestart() throws Throwable { + multiThreadedClientsServersRestart(GRID_CNT * 2, 0); + } + + /** + * @param srvs Number of servers. + * @param clients Number of clients. + * @throws Exception If any error occurs. + */ + private void multiThreadedClientsServersRestart(int srvs, int clients) throws Throwable { final AtomicBoolean done = new AtomicBoolean(); try { @@ -219,91 +236,95 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { info("Test timeout: " + (getTestTimeout() / (60 * 1000)) + " min."); - startGridsMultiThreaded(GRID_CNT); + startGridsMultiThreaded(srvs); - clientFlagGlobal = true; - - startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT); + IgniteInternalFuture clientFut = null; final AtomicReference error = new AtomicReference<>(); - final BlockingQueue clientStopIdxs = new LinkedBlockingQueue<>(); + if (clients > 0) { + clientFlagGlobal = true; - for (int i = GRID_CNT; i < GRID_CNT + CLIENT_GRID_CNT; i++) - clientStopIdxs.add(i); + startGridsMultiThreaded(srvs, clients); - final AtomicInteger clientStartIdx = new AtomicInteger(9000); + final BlockingQueue clientStopIdxs = new LinkedBlockingQueue<>(); - IgniteInternalFuture fut1 = multithreadedAsync( - new Callable() { - @Override public Object call() throws Exception { - try { - clientFlagPerThread.set(true); + for (int i = srvs; i < srvs + clients; i++) + clientStopIdxs.add(i); - while (!done.get() && error.get() == null) { - Integer stopIdx = clientStopIdxs.take(); - - log.info("Stop client: " + stopIdx); + final AtomicInteger clientStartIdx = new AtomicInteger(9000); - stopGrid(stopIdx); + clientFut = multithreadedAsync( + new Callable() { + @Override public Object call() throws Exception { + try { + clientFlagPerThread.set(true); while (!done.get() && error.get() == null) { - // Generate unique name to simplify debugging. - int startIdx = clientStartIdx.getAndIncrement(); + Integer stopIdx = clientStopIdxs.take(); - log.info("Start client: " + startIdx); + log.info("Stop client: " + stopIdx); - UUID id = UUID.randomUUID(); + stopGrid(stopIdx); - nodeId.set(id); + while (!done.get() && error.get() == null) { + // Generate unique name to simplify debugging. + int startIdx = clientStartIdx.getAndIncrement(); - try { - Ignite ignite = startGrid(startIdx); + log.info("Start client: " + startIdx); - assertTrue(ignite.configuration().isClientMode()); + UUID id = UUID.randomUUID(); - clientStopIdxs.add(startIdx); + nodeId.set(id); - break; - } - catch (Exception e) { - if (X.hasCause(e, IgniteClientDisconnectedCheckedException.class) || - X.hasCause(e, IgniteClientDisconnectedException.class)) - log.info("Client disconnected: " + e); - else if (X.hasCause(e, ClusterTopologyCheckedException.class)) - log.info("Client failed to start: " + e); - else { - if (failedNodes.contains(id) && X.hasCause(e, IgniteSpiException.class)) - log.info("Client failed: " + e); - else - throw e; + try { + Ignite ignite = startGrid(startIdx); + + assertTrue(ignite.configuration().isClientMode()); + + clientStopIdxs.add(startIdx); + + break; + } + catch (Exception e) { + if (X.hasCause(e, IgniteClientDisconnectedCheckedException.class) || + X.hasCause(e, IgniteClientDisconnectedException.class)) + log.info("Client disconnected: " + e); + else if (X.hasCause(e, ClusterTopologyCheckedException.class)) + log.info("Client failed to start: " + e); + else { + if (failedNodes.contains(id) && X.hasCause(e, IgniteSpiException.class)) + log.info("Client failed: " + e); + else + throw e; + } } } } } - } - catch (Throwable e) { - log.error("Unexpected error: " + e, e); + catch (Throwable e) { + log.error("Unexpected error: " + e, e); - error.compareAndSet(null, e); + error.compareAndSet(null, e); + + return null; + } return null; } - - return null; - } - }, - CLIENT_GRID_CNT, - "client-restart"); + }, + clients, + "client-restart"); + } final BlockingQueue srvStopIdxs = new LinkedBlockingQueue<>(); - for (int i = 0; i < GRID_CNT; i++) + for (int i = 0; i < srvs; i++) srvStopIdxs.add(i); - final AtomicInteger srvStartIdx = new AtomicInteger(GRID_CNT + CLIENT_GRID_CNT); + final AtomicInteger srvStartIdx = new AtomicInteger(srvs + clients); - IgniteInternalFuture fut2 = multithreadedAsync( + IgniteInternalFuture srvFut = multithreadedAsync( new Callable() { @Override public Object call() throws Exception { try { @@ -312,6 +333,8 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { while (!done.get() && error.get() == null) { int stopIdx = srvStopIdxs.take(); + Thread.currentThread().setName("stop-server-" + getTestGridName(stopIdx)); + log.info("Stop server: " + stopIdx); stopGrid(stopIdx); @@ -319,13 +342,20 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { // Generate unique name to simplify debugging. int startIdx = srvStartIdx.getAndIncrement(); + Thread.currentThread().setName("start-server-" + getTestGridName(startIdx)); + log.info("Start server: " + startIdx); - Ignite ignite = startGrid(startIdx); + try { + Ignite ignite = startGrid(startIdx); - assertFalse(ignite.configuration().isClientMode()); + assertFalse(ignite.configuration().isClientMode()); - srvStopIdxs.add(startIdx); + srvStopIdxs.add(startIdx); + } + catch (IgniteCheckedException e) { + log.info("Failed to start: " + e); + } } } catch (Throwable e) { @@ -339,7 +369,7 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { return null; } }, - GRID_CNT - 1, + srvs - 1, "server-restart"); final long timeToExec = getTestTimeout() - 60_000; @@ -356,8 +386,10 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { done.set(true); - fut1.cancel(); - fut2.cancel(); + if (clientFut != null) + clientFut.cancel(); + + srvFut.cancel(); throw err; } @@ -367,8 +399,10 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { done.set(true); - fut1.get(); - fut2.get(); + if (clientFut != null) + clientFut.get(); + + srvFut.get(); } finally { done.set(true); http://git-wip-us.apache.org/repos/asf/ignite/blob/bf9cf422/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index 51d8a2d..0e51c8e 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@ -21,12 +21,14 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; +import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashSet; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.Callable; @@ -38,6 +40,8 @@ import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteIllegalStateException; +import org.apache.ignite.Ignition; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.DiscoveryEvent; @@ -48,6 +52,7 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.processors.port.GridPortRecord; import org.apache.ignite.internal.util.io.GridByteArrayOutputStream; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.X; @@ -64,6 +69,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse; import org.apache.ignite.testframework.GridTestUtils; @@ -94,7 +100,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { private UUID nodeId; /** */ - private TcpDiscoverySpi nodeSpi; + private static ThreadLocal nodeSpi = new ThreadLocal<>(); /** * @throws Exception If fails. @@ -108,11 +114,14 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); - TcpDiscoverySpi spi = nodeSpi; + TcpDiscoverySpi spi = nodeSpi.get(); - if (spi == null) + if (spi == null) { spi = gridName.contains("testPingInterruptedOnNodeFailedFailingNode") ? new TestTcpDiscoverySpi() : new TcpDiscoverySpi(); + } + else + nodeSpi.set(null); discoMap.put(gridName, spi); @@ -1202,11 +1211,11 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { private void customEventRace1(final boolean cacheStartFrom1, boolean stopCrd) throws Exception { TestCustomEventRaceSpi spi0 = new TestCustomEventRaceSpi(); - nodeSpi = spi0; + nodeSpi.set(spi0); final Ignite ignite0 = startGrid(0); - nodeSpi = new TestCustomEventRaceSpi(); + nodeSpi.set(new TestCustomEventRaceSpi()); final Ignite ignite1 = startGrid(1); @@ -1221,7 +1230,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { @Override public Void call() throws Exception { log.info("Start 2"); - nodeSpi = new TestCustomEventRaceSpi(); + nodeSpi.set(new TestCustomEventRaceSpi()); Ignite ignite2 = startGrid(2); @@ -1271,7 +1280,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { assertEquals(1, cache.get(1)); - nodeSpi = new TestCustomEventRaceSpi(); + nodeSpi.set(new TestCustomEventRaceSpi()); Ignite ignite = startGrid(3); @@ -1314,15 +1323,15 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { private void customEventCoordinatorFailure(boolean twoNodes) throws Exception { TestCustomEventCoordinatorFailureSpi spi0 = new TestCustomEventCoordinatorFailureSpi(); - nodeSpi = spi0; + nodeSpi.set(spi0); Ignite ignite0 = startGrid(0); - nodeSpi = new TestCustomEventCoordinatorFailureSpi(); + nodeSpi.set(new TestCustomEventCoordinatorFailureSpi()); Ignite ignite1 = startGrid(1); - nodeSpi = new TestCustomEventCoordinatorFailureSpi(); + nodeSpi.set(new TestCustomEventCoordinatorFailureSpi()); Ignite ignite2 = twoNodes ? null : startGrid(2); @@ -1366,7 +1375,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { log.info("Try start one more node."); - nodeSpi = new TestCustomEventCoordinatorFailureSpi(); + nodeSpi.set(new TestCustomEventCoordinatorFailureSpi()); Ignite ignite = startGrid(twoNodes ? 2 : 3); @@ -1381,6 +1390,268 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { } /** + * Coordinator is added in failed list during node start. + * + * @throws Exception If failed. + */ + public void testFailedNodes1() throws Exception { + try { + final int FAIL_ORDER = 3; + + nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER)); + + final Ignite ignite0 = startGrid(0); + + nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER)); + + startGrid(1); + + nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER)); + + Ignite ignite2 = startGrid(2); + + assertEquals(2, ignite2.cluster().nodes().size()); + + waitNodeStop(ignite0.name()); + + tryCreateCache(2); + } + finally { + stopAllGrids(); + } + } + + /** + * Coordinator is added in failed list, concurrent nodes start. + * + * @throws Exception If failed. + */ + public void testFailedNodes2() throws Exception { + try { + final int FAIL_ORDER = 3; + + nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER)); + + Ignite ignite0 = startGrid(0); + + nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER)); + + startGrid(1); + + final AtomicInteger nodeIdx = new AtomicInteger(1); + + GridTestUtils.runMultiThreaded(new Callable() { + @Override public Void call() throws Exception { + int idx = nodeIdx.incrementAndGet(); + + nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER)); + + startGrid(idx); + + return null; + } + }, 3, "start-node"); + + Ignite ignite2 = ignite(2); + + waitForRemoteNodes(ignite2, 3); + + waitNodeStop(ignite0.name()); + + tryCreateCache(4); + } + finally { + stopAllGrids(); + } + } + + /** + * Coordinator is added in failed list during node start, test with two nodes. + * + * @throws Exception If failed. + */ + public void testFailedNodes3() throws Exception { + try { + nodeSpi.set(new TestFailedNodesSpi(-1)); + + Ignite ignite0 = startGrid(0); + + nodeSpi.set(new TestFailedNodesSpi(2)); + + Ignite ignite1 = startGrid(1); + + assertEquals(1, ignite1.cluster().nodes().size()); + + waitNodeStop(ignite0.name()); + + ignite1.getOrCreateCache(new CacheConfiguration<>()).put(1, 1); + + startGrid(2); + + assertEquals(2, ignite1.cluster().nodes().size()); + + tryCreateCache(2); + } + finally { + stopAllGrids(); + } + } + + /** + * Coordinator is added in failed list during node start, but node detected failure dies before + * sending {@link TcpDiscoveryNodeFailedMessage}. + * + * @throws Exception If failed. + */ + public void testFailedNodes4() throws Exception { + try { + final int FAIL_ORDER = 3; + + nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER)); + + final Ignite ignite0 = startGrid(0); + + nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER)); + + Ignite ignite1 = startGrid(1); + + TestFailedNodesSpi spi = new TestFailedNodesSpi(FAIL_ORDER); + + spi.stopBeforeSndFail = true; + + nodeSpi.set(spi); + + Ignite ignite2 = startGrid(2); + + waitNodeStop(ignite2.name()); + + log.info("Try start new node."); + + Ignite ignite3 = startGrid(3); + + waitNodeStop(ignite0.name()); + + assertEquals(2, ignite1.cluster().nodes().size()); + assertEquals(2, ignite3.cluster().nodes().size()); + + tryCreateCache(2); + } + finally { + stopAllGrids(); + } + } + + /** + * @param nodeName Node name. + * @throws Exception If failed. + */ + private void waitNodeStop(final String nodeName) throws Exception { + boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + try { + Ignition.ignite(nodeName); + + return false; + } + catch (IgniteIllegalStateException e) { + return true; + } + } + }, 10_000); + + if (!wait) + U.dumpThreads(log); + + assertTrue("Failed to wait for node stop.", wait); + } + + /** + * @param expNodes Expected nodes number. + */ + private void tryCreateCache(int expNodes) { + List allNodes = G.allGrids(); + + assertEquals(expNodes, allNodes.size()); + + int cntr = 0; + + for (Ignite ignite : allNodes) { + CacheConfiguration ccfg = new CacheConfiguration<>(); + + ccfg.setName("cache-" + cntr++); + + log.info("Try create cache [node=" + ignite.name() + ", cache=" + ccfg.getName() + ']'); + + ignite.getOrCreateCache(ccfg).put(1, 1); + } + } + + /** + * Simulate scenario when node detects node failure trying to send message, but node still alive. + */ + private static class TestFailedNodesSpi extends TcpDiscoverySpi { + /** */ + private AtomicBoolean failMsg = new AtomicBoolean(); + + /** */ + private int failOrder; + + /** */ + private boolean stopBeforeSndFail; + + /** */ + private boolean stop; + + /** + * @param failOrder Spi fails connection if local node order equals to this order. + */ + TestFailedNodesSpi(int failOrder) { + this.failOrder = failOrder; + } + + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, + TcpDiscoveryAbstractMessage msg, + GridByteArrayOutputStream bout, + long timeout) throws IOException, IgniteCheckedException { + if (stop) + return; + + if (locNode.internalOrder() == failOrder && + (msg instanceof TcpDiscoveryNodeAddedMessage) && + failMsg.compareAndSet(false, true)) { + log.info("IO error on message send [locNode=" + locNode + ", msg=" + msg + ']'); + + sock.close(); + + throw new SocketTimeoutException(); + } + + if (stopBeforeSndFail && + locNode.internalOrder() == failOrder && + (msg instanceof TcpDiscoveryNodeFailedMessage)) { + stop = true; + + log.info("Skip messages send and stop node [locNode=" + locNode + ", msg=" + msg + ']'); + + sock.close(); + + GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + ignite.close(); + + return null; + } + }, "stop-node"); + + return; + } + + super.writeToSocket(sock, msg, bout, timeout); + } + } + + /** * */ private static class TestCustomEventCoordinatorFailureSpi extends TcpDiscoverySpi { http://git-wip-us.apache.org/repos/asf/ignite/blob/bf9cf422/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java index 41d4b4a..e5d0a6b 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java @@ -1117,17 +1117,18 @@ public abstract class GridAbstractTest extends TestCase { cfg.setNodeId(null); - if (gridName != null && gridName.matches(".*\\d")) { + if (gridName != null && gridName.startsWith(getTestGridName()) && gridName.matches(".*\\d")) { String idStr = UUID.randomUUID().toString(); - char[] chars = idStr.toCharArray(); + String idxStr = String.valueOf(getTestGridIndex(gridName)); + + while (idxStr.length() < 5) + idxStr = '0' + idxStr; - chars[0] = gridName.charAt(gridName.length() - 1); - chars[1] = '0'; + char[] chars = idStr.toCharArray(); - chars[chars.length - 3] = '0'; - chars[chars.length - 2] = '0'; - chars[chars.length - 1] = gridName.charAt(gridName.length() - 1); + for (int i = 0; i < idxStr.length(); i++) + chars[chars.length - idxStr.length() + i] = idxStr.charAt(i); cfg.setNodeId(UUID.fromString(new String(chars))); }