ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [3/5] ignite git commit: ignite-3418 Avoid unnecessary discovery messages
Date Tue, 05 Jul 2016 12:13:16 GMT
ignite-3418 Avoid unnecessary discovery messages


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b0a51289
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b0a51289
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b0a51289

Branch: refs/heads/ignite-3414
Commit: b0a512890fd59cc2a6c1cb741d0f29f185afebd8
Parents: a8360a5
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Jul 5 08:40:11 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Jul 5 08:40:11 2016 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ClientImpl.java    |   2 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 107 ++++++---
 .../tcp/internal/TcpDiscoveryStatistics.java    |  45 +++-
 .../tcp/TcpDiscoveryMultiThreadedTest.java      | 222 +++++++++++++++++++
 .../spi/discovery/tcp/TcpDiscoverySelfTest.java |  52 ++++-
 5 files changed, 388 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a51289/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 834922c..79e58b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -616,7 +616,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 spi.writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
-                spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
+                spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp, 0);
 
                 if (log.isDebugEnabled())
                     log.debug("Message has been sent to address [msg=" + msg + ", addr="
+ addr +

http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a51289/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 8621496..7f689c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -203,7 +203,10 @@ class ServerImpl extends TcpDiscoveryImpl {
     private StatisticsPrinter statsPrinter;
 
     /** Failed nodes (but still in topology). */
-    private final Collection<TcpDiscoveryNode> failedNodes = new HashSet<>();
+    private final Map<TcpDiscoveryNode, UUID> failedNodes = new HashMap<>();
+
+    /** */
+    private final Collection<UUID> failedNodesMsgSent = new HashSet<>();
 
     /** Leaving nodes (but still in topology). */
     private final Collection<TcpDiscoveryNode> leavingNodes = new HashSet<>();
@@ -779,7 +782,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
         if (nodeAlive) {
             synchronized (mux) {
-                nodeAlive = !F.transform(failedNodes, F.node2id()).contains(nodeId) &&
+                nodeAlive = !F.transform(failedNodes.keySet(), F.node2id()).contains(nodeId)
&&
                     !F.transform(leavingNodes, F.node2id()).contains(nodeId);
             }
         }
@@ -1104,7 +1107,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     boolean ignore = false;
 
                     synchronized (failedNodes) {
-                        for (TcpDiscoveryNode failedNode : failedNodes) {
+                        for (TcpDiscoveryNode failedNode : failedNodes.keySet()) {
                             if (failedNode.id().equals(res.creatorNodeId())) {
                                 if (log.isDebugEnabled())
                                     log.debug("Ignore response from node from failed list:
" + res);
@@ -1134,7 +1137,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 spi.writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
-                spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
+                long tstamp0 = U.currentTimeMillis();
 
                 if (debugMode)
                     debugLog(msg, "Message has been sent directly to address [msg=" + msg
+ ", addr=" + addr +
@@ -1149,7 +1152,11 @@ class ServerImpl extends TcpDiscoveryImpl {
                 // E.g. due to class not found issue.
                 joinReqSent = msg instanceof TcpDiscoveryJoinRequestMessage;
 
-                return spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0));
+                int receipt = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0));
+
+                spi.stats.onMessageSent(msg, tstamp0 - tstamp, U.currentTimeMillis() - tstamp0);
+
+                return receipt;
             }
             catch (ClassCastException e) {
                 // This issue is rarely reproducible on AmazonEC2, but never
@@ -1371,7 +1378,7 @@ class ServerImpl extends TcpDiscoveryImpl {
     @Nullable private TcpDiscoveryNode resolveCoordinator(
         @Nullable Collection<TcpDiscoveryNode> filter) {
         synchronized (mux) {
-            Collection<TcpDiscoveryNode> excluded = F.concat(false, failedNodes, leavingNodes);
+            Collection<TcpDiscoveryNode> excluded = F.concat(false, failedNodes.keySet(),
leavingNodes);
 
             if (!F.isEmpty(filter))
                 excluded = F.concat(false, excluded, filter);
@@ -1526,7 +1533,7 @@ class ServerImpl extends TcpDiscoveryImpl {
         TcpDiscoveryNode next;
 
         synchronized (mux) {
-            next = ring.nextNode(failedNodes);
+            next = ring.nextNode(failedNodes.keySet());
         }
 
         if (next != null)
@@ -1597,7 +1604,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             b.append("Failed nodes: ").append(U.nl());
 
-            for (TcpDiscoveryNode node : failedNodes)
+            for (TcpDiscoveryNode node : failedNodes.keySet())
                 b.append("    ").append(node.id()).append(U.nl());
 
             b.append(U.nl());
@@ -1792,10 +1799,14 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 if (failedNode != null) {
                     if (!failedNode.isLocal()) {
-                        boolean added;
+                        boolean added = false;
 
                         synchronized (mux) {
-                            added = failedNodes.add(failedNode);
+                            if (!failedNodes.containsKey(failedNode)) {
+                                failedNodes.put(failedNode, msg.senderNodeId() != null ?
msg.senderNodeId() : getLocalNodeId());
+
+                                added = true;
+                            }
                         }
 
                         if (added && log.isDebugEnabled())
@@ -2403,7 +2414,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             TcpDiscoverySpiState state;
 
             synchronized (mux) {
-                failedNodes = U.arrayList(ServerImpl.this.failedNodes);
+                failedNodes = U.arrayList(ServerImpl.this.failedNodes.keySet());
 
                 state = spiState;
             }
@@ -2633,12 +2644,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                             assert !forceSndPending || msg instanceof TcpDiscoveryNodeLeftMessage;
 
-                            boolean sndPending =
-                                (newNextNode && ring.minimumNodeVersion().compareTo(CUSTOM_MSG_ALLOW_JOINING_FOR_VERIFIED_SINCE)
>= 0) ||
-                                    failure ||
-                                    forceSndPending;
-
-                            if (sndPending) {
+                            if (failure || forceSndPending) {
                                 if (log.isDebugEnabled())
                                     log.debug("Pending messages will be sent [failure=" +
failure +
                                         ", newNextNode=" + newNextNode +
@@ -2666,10 +2672,12 @@ class ServerImpl extends TcpDiscoveryImpl {
                                         clearNodeAddedMessage(pendingMsg);
                                     }
 
-                                    spi.stats.onMessageSent(pendingMsg, U.currentTimeMillis()
- tstamp);
+                                    long tstamp0 = U.currentTimeMillis();
 
                                     int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0));
 
+                                    spi.stats.onMessageSent(pendingMsg, tstamp0 - tstamp,
U.currentTimeMillis() - tstamp0);
+
                                     if (log.isDebugEnabled())
                                         log.debug("Pending message has been sent to next
node [msgId=" + msg.id() +
                                             ", pendingMsgId=" + pendingMsg.id() + ", next="
+ next.id() +
@@ -2713,10 +2721,12 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                                 spi.writeToSocket(sock, out, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
-                                spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
+                                long tstamp0 = U.currentTimeMillis();
 
                                 int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0));
 
+                                spi.stats.onMessageSent(msg, tstamp0 - tstamp, U.currentTimeMillis()
- tstamp0);
+
                                 onMessageExchanged();
 
                                 if (log.isDebugEnabled()) {
@@ -2818,7 +2828,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             }
 
             synchronized (mux) {
-                failedNodes.removeAll(ServerImpl.this.failedNodes);
+                failedNodes.removeAll(ServerImpl.this.failedNodes.keySet());
             }
 
             if (!failedNodes.isEmpty()) {
@@ -2832,7 +2842,13 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
 
                 synchronized (mux) {
-                    ServerImpl.this.failedNodes.addAll(failedNodes);
+                    for (TcpDiscoveryNode failedNode : failedNodes) {
+                        if (!ServerImpl.this.failedNodes.containsKey(failedNode))
+                            ServerImpl.this.failedNodes.put(failedNode, locNodeId);
+                    }
+
+                    for (TcpDiscoveryNode failedNode : failedNodes)
+                        failedNodesMsgSent.add(failedNode.id());
                 }
 
                 for (TcpDiscoveryNode n : failedNodes)
@@ -4214,6 +4230,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                     failedNodes.remove(leftNode);
 
                     leavingNodes.remove(leftNode);
+
+                    failedNodesMsgSent.remove(leftNode.id());
                 }
             }
 
@@ -4273,7 +4291,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     boolean contains;
 
                     synchronized (mux) {
-                        contains = failedNodes.contains(sndNode);
+                        contains = failedNodes.containsKey(sndNode);
                     }
 
                     if (contains) {
@@ -4302,7 +4320,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                 assert !node.isLocal() || !msg.verified() : msg;
 
                 synchronized (mux) {
-                    failedNodes.add(node);
+                    if (!failedNodes.containsKey(node))
+                        failedNodes.put(node, msg.senderNodeId() != null ? msg.senderNodeId()
: getLocalNodeId());
                 }
             }
             else {
@@ -4363,6 +4382,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     leavingNodes.remove(node);
 
+                    failedNodesMsgSent.remove(node.id());
+
                     ClientMessageWorker worker = clientMsgWorkers.remove(node.id());
 
                     if (worker != null)
@@ -4638,7 +4659,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     boolean failedNode;
 
                                     synchronized (mux) {
-                                        failedNode = failedNodes.contains(clientNode);
+                                        failedNode = failedNodes.containsKey(clientNode);
                                     }
 
                                     if (!failedNode) {
@@ -4873,23 +4894,43 @@ class ServerImpl extends TcpDiscoveryImpl {
 
         /**
          * Checks failed nodes list and sends {@link TcpDiscoveryNodeFailedMessage} if failed
node is still in the
-         * ring.
+         * ring and node detected failure left ring.
          */
         private void checkFailedNodesList() {
             List<TcpDiscoveryNodeFailedMessage> msgs = null;
 
             synchronized (mux) {
-                for (Iterator<TcpDiscoveryNode> it = failedNodes.iterator(); it.hasNext();
) {
-                    TcpDiscoveryNode node = it.next();
+                if (!failedNodes.isEmpty()) {
+                    for (Iterator<Map.Entry<TcpDiscoveryNode, UUID>> it = failedNodes.entrySet().iterator();
it.hasNext(); ) {
+                        Map.Entry<TcpDiscoveryNode, UUID> e = it.next();
 
-                    if (ring.node(node.id()) != null) {
-                        if (msgs == null)
-                            msgs = new ArrayList<>(failedNodes.size());
+                        TcpDiscoveryNode node = e.getKey();
+                        UUID failSndNode = e.getValue();
 
-                        msgs.add(new TcpDiscoveryNodeFailedMessage(getLocalNodeId(), node.id(),
node.internalOrder()));
+                        if (ring.node(node.id()) == null) {
+                            it.remove();
+
+                            continue;
+                        }
+
+                        if (!nodeAlive(failSndNode) && !failedNodesMsgSent.contains(node.id()))
{
+                            if (msgs == null)
+                                msgs = new ArrayList<>();
+
+                            msgs.add(new TcpDiscoveryNodeFailedMessage(getLocalNodeId(),
node.id(), node.internalOrder()));
+
+                            failedNodesMsgSent.add(node.id());
+                        }
+                    }
+                }
+
+                if (!failedNodesMsgSent.isEmpty()) {
+                    for (Iterator<UUID> it = failedNodesMsgSent.iterator(); it.hasNext();
) {
+                        UUID nodeId = it.next();
+
+                        if (ring.node(nodeId) == null)
+                            it.remove();
                     }
-                    else
-                        it.remove();
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a51289/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java
index f6232ba..5b20ef6 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java
@@ -89,6 +89,14 @@ public class TcpDiscoveryStatistics {
     /** Ring messages sent timestamps. */
     private final Map<IgniteUuid, Long> ringMsgsSndTs = new GridBoundedLinkedHashMap<>(1024);
 
+    /** */
+    @GridToStringInclude
+    private final Map<String, Long> avgMsgsAckTimes = new HashMap<>();
+
+    /** */
+    @GridToStringInclude
+    private final Map<String, Long> maxMsgsAckTimes = new HashMap<>();
+
     /** Average time messages is in queue. */
     private long avgMsgQueueTime;
 
@@ -302,8 +310,9 @@ public class TcpDiscoveryStatistics {
      *
      * @param msg Sent message.
      * @param time Time taken to serialize message.
+     * @param ackTime Time taken to receive message acknowledge.
      */
-    public synchronized void onMessageSent(TcpDiscoveryAbstractMessage msg, long time) {
+    public synchronized void onMessageSent(TcpDiscoveryAbstractMessage msg, long time, long
ackTime) {
         assert msg != null;
         assert time >= 0 : time;
 
@@ -326,7 +335,24 @@ public class TcpDiscoveryStatistics {
 
         sentMsgs.put(msg.getClass().getSimpleName(), ++cnt);
 
-        Long avgTime = F.addIfAbsent(avgMsgsSndTimes, msg.getClass().getSimpleName(), new
Callable<Long>() {
+        addTimeInfo(avgMsgsSndTimes, maxMsgsSndTimes, msg, cnt, time);
+
+        addTimeInfo(avgMsgsAckTimes, maxMsgsAckTimes, msg, cnt, time);
+    }
+
+    /**
+     * @param avgTimes Average times.
+     * @param maxTimes Max times.
+     * @param msg Message.
+     * @param cnt Total message count.
+     * @param time Time.
+     */
+    private void addTimeInfo(Map<String, Long> avgTimes,
+        Map<String, Long> maxTimes,
+        TcpDiscoveryAbstractMessage msg,
+        int cnt,
+        long time) {
+        Long avgTime = F.addIfAbsent(avgTimes, msg.getClass().getSimpleName(), new Callable<Long>()
{
             @Override public Long call() {
                 return 0L;
             }
@@ -336,9 +362,9 @@ public class TcpDiscoveryStatistics {
 
         avgTime = (avgTime * (cnt - 1) + time) / cnt;
 
-        avgMsgsSndTimes.put(msg.getClass().getSimpleName(), avgTime);
+        avgTimes.put(msg.getClass().getSimpleName(), avgTime);
 
-        Long maxTime = F.addIfAbsent(maxMsgsSndTimes, msg.getClass().getSimpleName(), new
Callable<Long>() {
+        Long maxTime = F.addIfAbsent(maxTimes, msg.getClass().getSimpleName(), new Callable<Long>()
{
             @Override public Long call() {
                 return 0L;
             }
@@ -347,7 +373,7 @@ public class TcpDiscoveryStatistics {
         assert maxTime != null;
 
         if (time > maxTime)
-            maxMsgsSndTimes.put(msg.getClass().getSimpleName(), time);
+            maxTimes.put(msg.getClass().getSimpleName(), time);
     }
 
     /**
@@ -474,6 +500,13 @@ public class TcpDiscoveryStatistics {
     }
 
     /**
+     * @return Sent messages counts (grouped by type).
+     */
+    public synchronized Map<String, Integer> sentMessages() {
+        return new HashMap<>(sentMsgs);
+    }
+
+    /**
      * Gets max messages send time (grouped by type).
      *
      * @return Map containing messages types and max send times.
@@ -648,6 +681,8 @@ public class TcpDiscoveryStatistics {
         sockReadersCreated = 0;
         sockReadersRmv = 0;
         sockTimeoutsCnt = 0;
+        avgMsgsAckTimes.clear();
+        maxMsgsAckTimes.clear();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a51289/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
index 5053c2d..28b527e 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
@@ -28,9 +28,17 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import javax.cache.Cache;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryUpdatedListener;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteClientDisconnectedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
@@ -42,6 +50,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
@@ -483,4 +492,217 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest
{
             stopAllGrids();
         }
     }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void _testCustomEventOnJoinCoordinatorStop() throws Exception {
+        for (int k = 0; k < 10; k++) {
+            log.info("Iteration: " + k);
+
+            clientFlagGlobal = false;
+
+            final int START_NODES = 5;
+            final int JOIN_NODES = 5;
+
+            startGrids(START_NODES);
+
+            final AtomicInteger startIdx = new AtomicInteger(START_NODES);
+
+            final AtomicBoolean stop = new AtomicBoolean();
+
+            IgniteInternalFuture<?> fut1 = GridTestUtils.runAsync(new Callable<Void>()
{
+                @Override public Void call() throws Exception {
+                    CacheConfiguration ccfg = new CacheConfiguration();
+
+                    Ignite ignite = ignite(START_NODES - 1);
+
+                    while (!stop.get()) {
+                        ignite.createCache(ccfg);
+
+                        ignite.destroyCache(ccfg.getName());
+                    }
+
+                    return null;
+                }
+            });
+
+            try {
+                final CyclicBarrier barrier = new CyclicBarrier(JOIN_NODES + 1);
+
+                IgniteInternalFuture<?> fut2 = GridTestUtils.runMultiThreadedAsync(new
Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        int idx = startIdx.getAndIncrement();
+
+                        Thread.currentThread().setName("start-thread-" + idx);
+
+                        barrier.await();
+
+                        Ignite ignite = startGrid(idx);
+
+                        assertFalse(ignite.configuration().isClientMode());
+
+                        log.info("Started node: " + ignite.name());
+
+                        IgniteCache<Object, Object> cache = ignite.getOrCreateCache((String)null);
+
+                        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+                        qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>()
{
+                            @Override public void onUpdated(Iterable<CacheEntryEvent<?,
?>> evts) {
+                                // No-op.
+                            }
+                        });
+
+                        QueryCursor<Cache.Entry<Object, Object>> cur = cache.query(qry);
+
+                        cur.close();
+
+                        return null;
+                    }
+                }, JOIN_NODES, "start-thread");
+
+                barrier.await();
+
+                U.sleep(ThreadLocalRandom.current().nextInt(10, 100));
+
+                for (int i = 0; i < START_NODES - 1; i++) {
+                    GridTestUtils.invoke(ignite(i).configuration().getDiscoverySpi(), "simulateNodeFailure");
+
+                    stopGrid(i);
+                }
+
+                stop.set(true);
+
+                fut1.get();
+                fut2.get();
+            }
+            finally {
+                stop.set(true);
+
+                fut1.get();
+            }
+
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void _testClientContinuousQueryCoordinatorStop() throws Exception {
+        for (int k = 0; k < 10; k++) {
+            log.info("Iteration: " + k);
+
+            clientFlagGlobal = false;
+
+            final int START_NODES = 5;
+            final int JOIN_NODES = 5;
+
+            startGrids(START_NODES);
+
+            ignite(0).createCache(new CacheConfiguration<>());
+
+            final AtomicInteger startIdx = new AtomicInteger(START_NODES);
+
+            final CyclicBarrier barrier = new CyclicBarrier(JOIN_NODES + 1);
+
+            clientFlagGlobal = true;
+
+            IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>()
{
+                @Override public Object call() throws Exception {
+                    int idx = startIdx.getAndIncrement();
+
+                    Thread.currentThread().setName("start-thread-" + idx);
+
+                    barrier.await();
+
+                    Ignite ignite = startGrid(idx);
+                    assertTrue(ignite.configuration().isClientMode());
+
+                    log.info("Started node: " + ignite.name());
+
+                    IgniteCache<Object, Object> cache = ignite.getOrCreateCache((String)null);
+
+                    for (int i = 0; i < 10; i++) {
+                        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+                        qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>()
{
+                            @Override public void onUpdated(Iterable<CacheEntryEvent<?,
?>> evts) {
+                                // No-op.
+                            }
+                        });
+
+                        cache.query(qry);
+                    }
+
+                    return null;
+                }
+            }, JOIN_NODES, "start-thread");
+
+            barrier.await();
+
+            U.sleep(ThreadLocalRandom.current().nextInt(100, 500));
+
+            for (int i = 0; i < START_NODES - 1; i++) {
+                GridTestUtils.invoke(ignite(i).configuration().getDiscoverySpi(), "simulateNodeFailure");
+
+                stopGrid(i);
+            }
+
+            fut.get();
+
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void _testCustomEventNodeRestart() throws Exception {
+        clientFlagGlobal = false;
+
+        Ignite ignite = startGrid(0);
+
+        ignite.getOrCreateCache(new CacheConfiguration<>());
+
+        final long stopTime = System.currentTimeMillis() + 60_000;
+
+        GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() {
+            @Override public void apply(Integer idx) {
+                try {
+                    while (System.currentTimeMillis() < stopTime) {
+                        Ignite ignite = startGrid(idx + 1);
+
+                        IgniteCache<Object, Object> cache = ignite.cache(null);
+
+                        int qryCnt = ThreadLocalRandom.current().nextInt(10) + 1;
+
+                        for (int i = 0; i < qryCnt; i++) {
+                            ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+                            qry.setLocalListener(new CacheEntryUpdatedListener<Object,
Object>() {
+                                @Override public void onUpdated(Iterable<CacheEntryEvent<?,
?>> evts) {
+                                    // No-op.
+                                }
+                            });
+
+                            QueryCursor<Cache.Entry<Object, Object>> cur = cache.query(qry);
+
+                            cur.close();
+                        }
+
+                        GridTestUtils.invoke(ignite.configuration().getDiscoverySpi(), "simulateNodeFailure");
+
+                        ignite.close();
+                    }
+                }
+                catch (Exception e) {
+                    log.error("Unexpected error: " + e, e);
+
+                    throw new IgniteException(e);
+                }
+            }
+        }, 5, "node-restart");
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a51289/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 2f408a2..32d9072 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -58,6 +58,7 @@ import org.apache.ignite.internal.processors.port.GridPortRecord;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
@@ -68,6 +69,7 @@ import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.DiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
+import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryStatistics;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
@@ -1814,6 +1816,54 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testNoExtraNodeFailedMessage() throws Exception {
+        try {
+            final int NODES = 10;
+
+            startGridsMultiThreaded(NODES);
+
+            int stopIdx = 5;
+
+            Ignite failIgnite = ignite(stopIdx);
+
+            ((TcpDiscoverySpi)failIgnite.configuration().getDiscoverySpi()).simulateNodeFailure();
+
+            for (int i = 0; i < NODES; i++) {
+                if (i != stopIdx) {
+                    final Ignite ignite = ignite(i);
+
+                    GridTestUtils.waitForCondition(new PA() {
+                        @Override public boolean apply() {
+                            return ignite.cluster().topologyVersion() >= NODES + 1;
+                        }
+                    }, 10_000);
+
+                    TcpDiscoverySpi spi = (TcpDiscoverySpi)ignite.configuration().getDiscoverySpi();
+
+                    TcpDiscoveryStatistics stats = GridTestUtils.getFieldValue(spi, "stats");
+
+                    Integer cnt = stats.sentMessages().get(TcpDiscoveryNodeFailedMessage.class.getSimpleName());
+
+                    log.info("Count1: " + cnt);
+
+                    assertTrue("Invalid message count: " + cnt, cnt == null || cnt <=
2);
+
+                    cnt = stats.receivedMessages().get(TcpDiscoveryNodeFailedMessage.class.getSimpleName());
+
+                    log.info("Count2: " + cnt);
+
+                    assertTrue("Invalid message count: " + cnt, cnt == null || cnt <=
2);
+                }
+            }
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
      * @param nodeName Node name.
      * @throws Exception If failed.
      */
@@ -1829,7 +1879,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
                     return true;
                 }
             }
-        }, 10_000);
+        }, 30_000);
 
         if (!wait)
             U.dumpThreads(log);


Mime
View raw message