Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D042A200B35 for ; Tue, 5 Jul 2016 08:55:01 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id CED11160A4F; Tue, 5 Jul 2016 06:55:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5D580160A6F for ; Tue, 5 Jul 2016 08:55:00 +0200 (CEST) Received: (qmail 90506 invoked by uid 500); 5 Jul 2016 06:54:59 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 90424 invoked by uid 99); 5 Jul 2016 06:54:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 Jul 2016 06:54:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 35300EC227; Tue, 5 Jul 2016 06:54:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Tue, 05 Jul 2016 06:55:01 -0000 Message-Id: <60ceca582f9446258144c03703f15872@git.apache.org> In-Reply-To: <8d1e515cc3e94ee29f70d8b51f5dde1c@git.apache.org> References: <8d1e515cc3e94ee29f70d8b51f5dde1c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/4] ignite git commit: ignite-3418 Avoid unnecessary discovery messages archived-at: Tue, 05 Jul 2016 06:55:02 -0000 ignite-3418 Avoid unnecessary discovery messages Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b0a51289 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b0a51289 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b0a51289 Branch: refs/heads/ignite-3154 Commit: b0a512890fd59cc2a6c1cb741d0f29f185afebd8 Parents: a8360a5 Author: sboikov Authored: Tue Jul 5 08:40:11 2016 +0300 Committer: sboikov Committed: Tue Jul 5 08:40:11 2016 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ClientImpl.java | 2 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 107 ++++++--- .../tcp/internal/TcpDiscoveryStatistics.java | 45 +++- .../tcp/TcpDiscoveryMultiThreadedTest.java | 222 +++++++++++++++++++ .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 52 ++++- 5 files changed, 388 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a51289/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 834922c..79e58b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -616,7 +616,7 @@ class ClientImpl extends TcpDiscoveryImpl { spi.writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); - spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp); + spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp, 0); if (log.isDebugEnabled()) log.debug("Message has been sent to address [msg=" + msg + ", addr=" + addr + http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a51289/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 8621496..7f689c5 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -203,7 +203,10 @@ class ServerImpl extends TcpDiscoveryImpl { private StatisticsPrinter statsPrinter; /** Failed nodes (but still in topology). */ - private final Collection failedNodes = new HashSet<>(); + private final Map failedNodes = new HashMap<>(); + + /** */ + private final Collection failedNodesMsgSent = new HashSet<>(); /** Leaving nodes (but still in topology). */ private final Collection leavingNodes = new HashSet<>(); @@ -779,7 +782,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (nodeAlive) { synchronized (mux) { - nodeAlive = !F.transform(failedNodes, F.node2id()).contains(nodeId) && + nodeAlive = !F.transform(failedNodes.keySet(), F.node2id()).contains(nodeId) && !F.transform(leavingNodes, F.node2id()).contains(nodeId); } } @@ -1104,7 +1107,7 @@ class ServerImpl extends TcpDiscoveryImpl { boolean ignore = false; synchronized (failedNodes) { - for (TcpDiscoveryNode failedNode : failedNodes) { + for (TcpDiscoveryNode failedNode : failedNodes.keySet()) { if (failedNode.id().equals(res.creatorNodeId())) { if (log.isDebugEnabled()) log.debug("Ignore response from node from failed list: " + res); @@ -1134,7 +1137,7 @@ class ServerImpl extends TcpDiscoveryImpl { spi.writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); - spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp); + long tstamp0 = U.currentTimeMillis(); if (debugMode) debugLog(msg, "Message has been sent directly to address [msg=" + msg + ", addr=" + addr + @@ -1149,7 +1152,11 @@ class ServerImpl extends TcpDiscoveryImpl { // E.g. due to class not found issue. joinReqSent = msg instanceof TcpDiscoveryJoinRequestMessage; - return spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)); + int receipt = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)); + + spi.stats.onMessageSent(msg, tstamp0 - tstamp, U.currentTimeMillis() - tstamp0); + + return receipt; } catch (ClassCastException e) { // This issue is rarely reproducible on AmazonEC2, but never @@ -1371,7 +1378,7 @@ class ServerImpl extends TcpDiscoveryImpl { @Nullable private TcpDiscoveryNode resolveCoordinator( @Nullable Collection filter) { synchronized (mux) { - Collection excluded = F.concat(false, failedNodes, leavingNodes); + Collection excluded = F.concat(false, failedNodes.keySet(), leavingNodes); if (!F.isEmpty(filter)) excluded = F.concat(false, excluded, filter); @@ -1526,7 +1533,7 @@ class ServerImpl extends TcpDiscoveryImpl { TcpDiscoveryNode next; synchronized (mux) { - next = ring.nextNode(failedNodes); + next = ring.nextNode(failedNodes.keySet()); } if (next != null) @@ -1597,7 +1604,7 @@ class ServerImpl extends TcpDiscoveryImpl { b.append("Failed nodes: ").append(U.nl()); - for (TcpDiscoveryNode node : failedNodes) + for (TcpDiscoveryNode node : failedNodes.keySet()) b.append(" ").append(node.id()).append(U.nl()); b.append(U.nl()); @@ -1792,10 +1799,14 @@ class ServerImpl extends TcpDiscoveryImpl { if (failedNode != null) { if (!failedNode.isLocal()) { - boolean added; + boolean added = false; synchronized (mux) { - added = failedNodes.add(failedNode); + if (!failedNodes.containsKey(failedNode)) { + failedNodes.put(failedNode, msg.senderNodeId() != null ? msg.senderNodeId() : getLocalNodeId()); + + added = true; + } } if (added && log.isDebugEnabled()) @@ -2403,7 +2414,7 @@ class ServerImpl extends TcpDiscoveryImpl { TcpDiscoverySpiState state; synchronized (mux) { - failedNodes = U.arrayList(ServerImpl.this.failedNodes); + failedNodes = U.arrayList(ServerImpl.this.failedNodes.keySet()); state = spiState; } @@ -2633,12 +2644,7 @@ class ServerImpl extends TcpDiscoveryImpl { assert !forceSndPending || msg instanceof TcpDiscoveryNodeLeftMessage; - boolean sndPending = - (newNextNode && ring.minimumNodeVersion().compareTo(CUSTOM_MSG_ALLOW_JOINING_FOR_VERIFIED_SINCE) >= 0) || - failure || - forceSndPending; - - if (sndPending) { + if (failure || forceSndPending) { if (log.isDebugEnabled()) log.debug("Pending messages will be sent [failure=" + failure + ", newNextNode=" + newNextNode + @@ -2666,10 +2672,12 @@ class ServerImpl extends TcpDiscoveryImpl { clearNodeAddedMessage(pendingMsg); } - spi.stats.onMessageSent(pendingMsg, U.currentTimeMillis() - tstamp); + long tstamp0 = U.currentTimeMillis(); int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)); + spi.stats.onMessageSent(pendingMsg, tstamp0 - tstamp, U.currentTimeMillis() - tstamp0); + if (log.isDebugEnabled()) log.debug("Pending message has been sent to next node [msgId=" + msg.id() + ", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() + @@ -2713,10 +2721,12 @@ class ServerImpl extends TcpDiscoveryImpl { spi.writeToSocket(sock, out, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); - spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp); + long tstamp0 = U.currentTimeMillis(); int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)); + spi.stats.onMessageSent(msg, tstamp0 - tstamp, U.currentTimeMillis() - tstamp0); + onMessageExchanged(); if (log.isDebugEnabled()) { @@ -2818,7 +2828,7 @@ class ServerImpl extends TcpDiscoveryImpl { } synchronized (mux) { - failedNodes.removeAll(ServerImpl.this.failedNodes); + failedNodes.removeAll(ServerImpl.this.failedNodes.keySet()); } if (!failedNodes.isEmpty()) { @@ -2832,7 +2842,13 @@ class ServerImpl extends TcpDiscoveryImpl { } synchronized (mux) { - ServerImpl.this.failedNodes.addAll(failedNodes); + for (TcpDiscoveryNode failedNode : failedNodes) { + if (!ServerImpl.this.failedNodes.containsKey(failedNode)) + ServerImpl.this.failedNodes.put(failedNode, locNodeId); + } + + for (TcpDiscoveryNode failedNode : failedNodes) + failedNodesMsgSent.add(failedNode.id()); } for (TcpDiscoveryNode n : failedNodes) @@ -4214,6 +4230,8 @@ class ServerImpl extends TcpDiscoveryImpl { failedNodes.remove(leftNode); leavingNodes.remove(leftNode); + + failedNodesMsgSent.remove(leftNode.id()); } } @@ -4273,7 +4291,7 @@ class ServerImpl extends TcpDiscoveryImpl { boolean contains; synchronized (mux) { - contains = failedNodes.contains(sndNode); + contains = failedNodes.containsKey(sndNode); } if (contains) { @@ -4302,7 +4320,8 @@ class ServerImpl extends TcpDiscoveryImpl { assert !node.isLocal() || !msg.verified() : msg; synchronized (mux) { - failedNodes.add(node); + if (!failedNodes.containsKey(node)) + failedNodes.put(node, msg.senderNodeId() != null ? msg.senderNodeId() : getLocalNodeId()); } } else { @@ -4363,6 +4382,8 @@ class ServerImpl extends TcpDiscoveryImpl { leavingNodes.remove(node); + failedNodesMsgSent.remove(node.id()); + ClientMessageWorker worker = clientMsgWorkers.remove(node.id()); if (worker != null) @@ -4638,7 +4659,7 @@ class ServerImpl extends TcpDiscoveryImpl { boolean failedNode; synchronized (mux) { - failedNode = failedNodes.contains(clientNode); + failedNode = failedNodes.containsKey(clientNode); } if (!failedNode) { @@ -4873,23 +4894,43 @@ class ServerImpl extends TcpDiscoveryImpl { /** * Checks failed nodes list and sends {@link TcpDiscoveryNodeFailedMessage} if failed node is still in the - * ring. + * ring and node detected failure left ring. */ private void checkFailedNodesList() { List msgs = null; synchronized (mux) { - for (Iterator it = failedNodes.iterator(); it.hasNext(); ) { - TcpDiscoveryNode node = it.next(); + if (!failedNodes.isEmpty()) { + for (Iterator> it = failedNodes.entrySet().iterator(); it.hasNext(); ) { + Map.Entry e = it.next(); - if (ring.node(node.id()) != null) { - if (msgs == null) - msgs = new ArrayList<>(failedNodes.size()); + TcpDiscoveryNode node = e.getKey(); + UUID failSndNode = e.getValue(); - msgs.add(new TcpDiscoveryNodeFailedMessage(getLocalNodeId(), node.id(), node.internalOrder())); + if (ring.node(node.id()) == null) { + it.remove(); + + continue; + } + + if (!nodeAlive(failSndNode) && !failedNodesMsgSent.contains(node.id())) { + if (msgs == null) + msgs = new ArrayList<>(); + + msgs.add(new TcpDiscoveryNodeFailedMessage(getLocalNodeId(), node.id(), node.internalOrder())); + + failedNodesMsgSent.add(node.id()); + } + } + } + + if (!failedNodesMsgSent.isEmpty()) { + for (Iterator it = failedNodesMsgSent.iterator(); it.hasNext(); ) { + UUID nodeId = it.next(); + + if (ring.node(nodeId) == null) + it.remove(); } - else - it.remove(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a51289/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java index f6232ba..5b20ef6 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java @@ -89,6 +89,14 @@ public class TcpDiscoveryStatistics { /** Ring messages sent timestamps. */ private final Map ringMsgsSndTs = new GridBoundedLinkedHashMap<>(1024); + /** */ + @GridToStringInclude + private final Map avgMsgsAckTimes = new HashMap<>(); + + /** */ + @GridToStringInclude + private final Map maxMsgsAckTimes = new HashMap<>(); + /** Average time messages is in queue. */ private long avgMsgQueueTime; @@ -302,8 +310,9 @@ public class TcpDiscoveryStatistics { * * @param msg Sent message. * @param time Time taken to serialize message. + * @param ackTime Time taken to receive message acknowledge. */ - public synchronized void onMessageSent(TcpDiscoveryAbstractMessage msg, long time) { + public synchronized void onMessageSent(TcpDiscoveryAbstractMessage msg, long time, long ackTime) { assert msg != null; assert time >= 0 : time; @@ -326,7 +335,24 @@ public class TcpDiscoveryStatistics { sentMsgs.put(msg.getClass().getSimpleName(), ++cnt); - Long avgTime = F.addIfAbsent(avgMsgsSndTimes, msg.getClass().getSimpleName(), new Callable() { + addTimeInfo(avgMsgsSndTimes, maxMsgsSndTimes, msg, cnt, time); + + addTimeInfo(avgMsgsAckTimes, maxMsgsAckTimes, msg, cnt, time); + } + + /** + * @param avgTimes Average times. + * @param maxTimes Max times. + * @param msg Message. + * @param cnt Total message count. + * @param time Time. + */ + private void addTimeInfo(Map avgTimes, + Map maxTimes, + TcpDiscoveryAbstractMessage msg, + int cnt, + long time) { + Long avgTime = F.addIfAbsent(avgTimes, msg.getClass().getSimpleName(), new Callable() { @Override public Long call() { return 0L; } @@ -336,9 +362,9 @@ public class TcpDiscoveryStatistics { avgTime = (avgTime * (cnt - 1) + time) / cnt; - avgMsgsSndTimes.put(msg.getClass().getSimpleName(), avgTime); + avgTimes.put(msg.getClass().getSimpleName(), avgTime); - Long maxTime = F.addIfAbsent(maxMsgsSndTimes, msg.getClass().getSimpleName(), new Callable() { + Long maxTime = F.addIfAbsent(maxTimes, msg.getClass().getSimpleName(), new Callable() { @Override public Long call() { return 0L; } @@ -347,7 +373,7 @@ public class TcpDiscoveryStatistics { assert maxTime != null; if (time > maxTime) - maxMsgsSndTimes.put(msg.getClass().getSimpleName(), time); + maxTimes.put(msg.getClass().getSimpleName(), time); } /** @@ -474,6 +500,13 @@ public class TcpDiscoveryStatistics { } /** + * @return Sent messages counts (grouped by type). + */ + public synchronized Map sentMessages() { + return new HashMap<>(sentMsgs); + } + + /** * Gets max messages send time (grouped by type). * * @return Map containing messages types and max send times. @@ -648,6 +681,8 @@ public class TcpDiscoveryStatistics { sockReadersCreated = 0; sockReadersRmv = 0; sockTimeoutsCnt = 0; + avgMsgsAckTimes.clear(); + maxMsgsAckTimes.clear(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a51289/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java index 5053c2d..28b527e 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java @@ -28,9 +28,17 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import javax.cache.Cache; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryUpdatedListener; import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteClientDisconnectedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; @@ -42,6 +50,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; @@ -483,4 +492,217 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { stopAllGrids(); } } + + /** + * @throws Exception If failed. + */ + public void _testCustomEventOnJoinCoordinatorStop() throws Exception { + for (int k = 0; k < 10; k++) { + log.info("Iteration: " + k); + + clientFlagGlobal = false; + + final int START_NODES = 5; + final int JOIN_NODES = 5; + + startGrids(START_NODES); + + final AtomicInteger startIdx = new AtomicInteger(START_NODES); + + final AtomicBoolean stop = new AtomicBoolean(); + + IgniteInternalFuture fut1 = GridTestUtils.runAsync(new Callable() { + @Override public Void call() throws Exception { + CacheConfiguration ccfg = new CacheConfiguration(); + + Ignite ignite = ignite(START_NODES - 1); + + while (!stop.get()) { + ignite.createCache(ccfg); + + ignite.destroyCache(ccfg.getName()); + } + + return null; + } + }); + + try { + final CyclicBarrier barrier = new CyclicBarrier(JOIN_NODES + 1); + + IgniteInternalFuture fut2 = GridTestUtils.runMultiThreadedAsync(new Callable() { + @Override public Object call() throws Exception { + int idx = startIdx.getAndIncrement(); + + Thread.currentThread().setName("start-thread-" + idx); + + barrier.await(); + + Ignite ignite = startGrid(idx); + + assertFalse(ignite.configuration().isClientMode()); + + log.info("Started node: " + ignite.name()); + + IgniteCache cache = ignite.getOrCreateCache((String)null); + + ContinuousQuery qry = new ContinuousQuery<>(); + + qry.setLocalListener(new CacheEntryUpdatedListener() { + @Override public void onUpdated(Iterable> evts) { + // No-op. + } + }); + + QueryCursor> cur = cache.query(qry); + + cur.close(); + + return null; + } + }, JOIN_NODES, "start-thread"); + + barrier.await(); + + U.sleep(ThreadLocalRandom.current().nextInt(10, 100)); + + for (int i = 0; i < START_NODES - 1; i++) { + GridTestUtils.invoke(ignite(i).configuration().getDiscoverySpi(), "simulateNodeFailure"); + + stopGrid(i); + } + + stop.set(true); + + fut1.get(); + fut2.get(); + } + finally { + stop.set(true); + + fut1.get(); + } + + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void _testClientContinuousQueryCoordinatorStop() throws Exception { + for (int k = 0; k < 10; k++) { + log.info("Iteration: " + k); + + clientFlagGlobal = false; + + final int START_NODES = 5; + final int JOIN_NODES = 5; + + startGrids(START_NODES); + + ignite(0).createCache(new CacheConfiguration<>()); + + final AtomicInteger startIdx = new AtomicInteger(START_NODES); + + final CyclicBarrier barrier = new CyclicBarrier(JOIN_NODES + 1); + + clientFlagGlobal = true; + + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable() { + @Override public Object call() throws Exception { + int idx = startIdx.getAndIncrement(); + + Thread.currentThread().setName("start-thread-" + idx); + + barrier.await(); + + Ignite ignite = startGrid(idx); + assertTrue(ignite.configuration().isClientMode()); + + log.info("Started node: " + ignite.name()); + + IgniteCache cache = ignite.getOrCreateCache((String)null); + + for (int i = 0; i < 10; i++) { + ContinuousQuery qry = new ContinuousQuery<>(); + + qry.setLocalListener(new CacheEntryUpdatedListener() { + @Override public void onUpdated(Iterable> evts) { + // No-op. + } + }); + + cache.query(qry); + } + + return null; + } + }, JOIN_NODES, "start-thread"); + + barrier.await(); + + U.sleep(ThreadLocalRandom.current().nextInt(100, 500)); + + for (int i = 0; i < START_NODES - 1; i++) { + GridTestUtils.invoke(ignite(i).configuration().getDiscoverySpi(), "simulateNodeFailure"); + + stopGrid(i); + } + + fut.get(); + + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void _testCustomEventNodeRestart() throws Exception { + clientFlagGlobal = false; + + Ignite ignite = startGrid(0); + + ignite.getOrCreateCache(new CacheConfiguration<>()); + + final long stopTime = System.currentTimeMillis() + 60_000; + + GridTestUtils.runMultiThreaded(new IgniteInClosure() { + @Override public void apply(Integer idx) { + try { + while (System.currentTimeMillis() < stopTime) { + Ignite ignite = startGrid(idx + 1); + + IgniteCache cache = ignite.cache(null); + + int qryCnt = ThreadLocalRandom.current().nextInt(10) + 1; + + for (int i = 0; i < qryCnt; i++) { + ContinuousQuery qry = new ContinuousQuery<>(); + + qry.setLocalListener(new CacheEntryUpdatedListener() { + @Override public void onUpdated(Iterable> evts) { + // No-op. + } + }); + + QueryCursor> cur = cache.query(qry); + + cur.close(); + } + + GridTestUtils.invoke(ignite.configuration().getDiscoverySpi(), "simulateNodeFailure"); + + ignite.close(); + } + } + catch (Exception e) { + log.error("Unexpected error: " + e, e); + + throw new IgniteException(e); + } + } + }, 5, "node-restart"); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a51289/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index 2f408a2..32d9072 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@ -58,6 +58,7 @@ import org.apache.ignite.internal.processors.port.GridPortRecord; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; @@ -68,6 +69,7 @@ import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.DiscoverySpi; import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; +import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryStatistics; import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; @@ -1814,6 +1816,54 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testNoExtraNodeFailedMessage() throws Exception { + try { + final int NODES = 10; + + startGridsMultiThreaded(NODES); + + int stopIdx = 5; + + Ignite failIgnite = ignite(stopIdx); + + ((TcpDiscoverySpi)failIgnite.configuration().getDiscoverySpi()).simulateNodeFailure(); + + for (int i = 0; i < NODES; i++) { + if (i != stopIdx) { + final Ignite ignite = ignite(i); + + GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return ignite.cluster().topologyVersion() >= NODES + 1; + } + }, 10_000); + + TcpDiscoverySpi spi = (TcpDiscoverySpi)ignite.configuration().getDiscoverySpi(); + + TcpDiscoveryStatistics stats = GridTestUtils.getFieldValue(spi, "stats"); + + Integer cnt = stats.sentMessages().get(TcpDiscoveryNodeFailedMessage.class.getSimpleName()); + + log.info("Count1: " + cnt); + + assertTrue("Invalid message count: " + cnt, cnt == null || cnt <= 2); + + cnt = stats.receivedMessages().get(TcpDiscoveryNodeFailedMessage.class.getSimpleName()); + + log.info("Count2: " + cnt); + + assertTrue("Invalid message count: " + cnt, cnt == null || cnt <= 2); + } + } + } + finally { + stopAllGrids(); + } + } + + /** * @param nodeName Node name. * @throws Exception If failed. */ @@ -1829,7 +1879,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { return true; } } - }, 10_000); + }, 30_000); if (!wait) U.dumpThreads(log);