Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 19F7F200C29 for ; Mon, 13 Feb 2017 11:32:03 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 18AC3160B60; Mon, 13 Feb 2017 10:32:03 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 633AD160B81 for ; Mon, 13 Feb 2017 11:32:00 +0100 (CET) Received: (qmail 89298 invoked by uid 500); 13 Feb 2017 10:31:59 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 88670 invoked by uid 99); 13 Feb 2017 10:31:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Feb 2017 10:31:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 05891DFF1F; Mon, 13 Feb 2017 10:31:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: akuznetsov@apache.org To: commits@ignite.apache.org Date: Mon, 13 Feb 2017 10:32:15 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [18/31] ignite git commit: ignite-4499 Drop node from topology in case when connection creation is impossible archived-at: Mon, 13 Feb 2017 10:32:03 -0000 ignite-4499 Drop node from topology in case when connection creation is impossible Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f9aaf035 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f9aaf035 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f9aaf035 Branch: refs/heads/ignite-4436-2 Commit: f9aaf0353cea54afefea4caac74b1583eb17969b Parents: ecf4b8b Author: agura Authored: Wed Jan 18 18:04:45 2017 +0300 Committer: agura Committed: Wed Jan 18 18:04:45 2017 +0300 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 3 + .../communication/tcp/TcpCommunicationSpi.java | 16 + .../ignite/spi/discovery/tcp/ClientImpl.java | 88 ++++- .../ignite/spi/discovery/tcp/ServerImpl.java | 61 ++-- .../messages/TcpDiscoveryAbstractMessage.java | 21 ++ .../tcp/TcpCommunicationSpiDropNodesTest.java | 322 +++++++++++++++++++ .../TcpCommunicationSpiFaultyClientTest.java | 270 ++++++++++++++++ .../ignite/testframework/GridTestNode.java | 1 + .../testframework/junits/GridAbstractTest.java | 2 + .../IgniteSpiCommunicationSelfTestSuite.java | 5 + .../cache/IgniteCacheAbstractQuerySelfTest.java | 8 +- 11 files changed, 758 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f9aaf035/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 0da0f49..d77b2fb 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -65,6 +65,9 @@ public final class IgniteSystemProperties { */ public static final String IGNITE_NO_DISCO_ORDER = "IGNITE_NO_DISCO_ORDER"; + /** Defines reconnect delay in milliseconds for client node that was failed forcible. */ + public static final String IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY = "IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY"; + /** * If this system property is set to {@code false} - no checks for new versions will * be performed by Ignite. By default, Ignite periodically checks for the new http://git-wip-us.apache.org/repos/asf/ignite/blob/f9aaf035/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 1fe437c..94b7efe 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -94,6 +94,7 @@ import org.apache.ignite.internal.util.nio.ssl.GridSslMeta; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -2550,6 +2551,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter "operating system firewall is disabled on local and remote hosts) " + "[addrs=" + addrs + ']'); + if (getSpiContext().node(node.id()) != null && (CU.clientNode(node) || !CU.clientNode(getLocalNode())) && + X.hasCause(errs, ConnectException.class, SocketTimeoutException.class, HandshakeTimeoutException.class, + IgniteSpiOperationTimeoutException.class)) { + LT.warn(log, "TcpCommunicationSpi failed to establish connection to node, node will be dropped from " + + "cluster [" + + "rmtNode=" + node + + ", err=" + errs + + ", connectErrs=" + Arrays.toString(errs.getSuppressed()) + ']'); + + getSpiContext().failNode(node.id(), "TcpCommunicationSpi failed to establish connection to node [" + + "rmtNode=" + node + + ", errs=" + errs + + ", connectErrs=" + Arrays.toString(errs.getSuppressed()) + ']'); + } + throw errs; } http://git-wip-us.apache.org/repos/asf/ignite/blob/f9aaf035/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 9a1261c..35f0908 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -50,6 +50,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.CacheMetrics; import org.apache.ignite.cluster.ClusterMetrics; import org.apache.ignite.cluster.ClusterNode; @@ -100,6 +101,7 @@ import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY; import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED; import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; @@ -166,6 +168,9 @@ class ClientImpl extends TcpDiscoveryImpl { /** */ protected MessageWorker msgWorker; + /** Force fail message for local node. */ + private TcpDiscoveryNodeFailedMessage forceFailMsg; + /** */ @GridToStringExclude private int joinCnt; @@ -450,6 +455,8 @@ class ClientImpl extends TcpDiscoveryImpl { msg.warning(warning); + msg.force(true); + msgWorker.addMessage(msg); } } @@ -1396,6 +1403,14 @@ class ClientImpl extends TcpDiscoveryImpl { else leaveLatch.countDown(); } + else if (msg instanceof TcpDiscoveryNodeFailedMessage && + ((TcpDiscoveryNodeFailedMessage)msg).failedNodeId().equals(locNode.id())) { + TcpDiscoveryNodeFailedMessage msg0 = (TcpDiscoveryNodeFailedMessage)msg; + + assert msg0.force() : msg0; + + forceFailMsg = msg0; + } else if (msg instanceof SocketClosedMessage) { if (((SocketClosedMessage)msg).sock == currSock) { currSock = null; @@ -1412,25 +1427,45 @@ class ClientImpl extends TcpDiscoveryImpl { } } else { - if (log.isDebugEnabled()) - log.debug("Connection closed, will try to restore connection."); + if (forceFailMsg != null) { + if (log.isDebugEnabled()) { + log.debug("Connection closed, local node received force fail message, " + + "will not try to restore connection"); + } + + queue.addFirst(SPI_RECONNECT_FAILED); + } + else { + if (log.isDebugEnabled()) + log.debug("Connection closed, will try to restore connection."); - assert reconnector == null; + assert reconnector == null; - final Reconnector reconnector = new Reconnector(join); - this.reconnector = reconnector; - reconnector.start(); + final Reconnector reconnector = new Reconnector(join); + this.reconnector = reconnector; + reconnector.start(); + } } } } else if (msg == SPI_RECONNECT_FAILED) { - reconnector.cancel(); - reconnector.join(); + if (reconnector != null) { + reconnector.cancel(); + reconnector.join(); - reconnector = null; + reconnector = null; + } + else + assert forceFailMsg != null; if (spi.isClientReconnectDisabled()) { if (state != SEGMENTED && state != STOPPED) { + if (forceFailMsg != null) { + U.quietAndWarn(log, "Local node was dropped from cluster due to network problems " + + "[nodeInitiatedFail=" + forceFailMsg.creatorNodeId() + + ", msg=" + forceFailMsg.warning() + ']'); + } + if (log.isDebugEnabled()) { log.debug("Failed to restore closed connection, reconnect disabled, " + "local node segmented [networkTimeout=" + spi.netTimeout + ']'); @@ -1445,7 +1480,9 @@ class ClientImpl extends TcpDiscoveryImpl { if (state == STARTING || state == CONNECTED) { if (log.isDebugEnabled()) { log.debug("Failed to restore closed connection, will try to reconnect " + - "[networkTimeout=" + spi.netTimeout + ", joinTimeout=" + spi.joinTimeout + ']'); + "[networkTimeout=" + spi.netTimeout + + ", joinTimeout=" + spi.joinTimeout + + ", failMsg=" + forceFailMsg + ']'); } state = DISCONNECTED; @@ -1468,7 +1505,36 @@ class ClientImpl extends TcpDiscoveryImpl { UUID newId = UUID.randomUUID(); - if (log.isInfoEnabled()) { + if (forceFailMsg != null) { + long delay = IgniteSystemProperties.getLong(IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY, + 10_000); + + if (delay > 0) { + U.quietAndWarn(log, "Local node was dropped from cluster due to network problems, " + + "will try to reconnect with new id after " + delay + "ms (reconnect delay " + + "can be changed using IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY system " + + "property) [" + + "newId=" + newId + + ", prevId=" + locNode.id() + + ", locNode=" + locNode + + ", nodeInitiatedFail=" + forceFailMsg.creatorNodeId() + + ", msg=" + forceFailMsg.warning() + ']'); + + Thread.sleep(delay); + } + else { + U.quietAndWarn(log, "Local node was dropped from cluster due to network problems, " + + "will try to reconnect with new id [" + + "newId=" + newId + + ", prevId=" + locNode.id() + + ", locNode=" + locNode + + ", nodeInitiatedFail=" + forceFailMsg.creatorNodeId() + + ", msg=" + forceFailMsg.warning() + ']'); + } + + forceFailMsg = null; + } + else if (log.isInfoEnabled()) { log.info("Client node disconnected from cluster, will try to reconnect with new id " + "[newId=" + newId + ", prevId=" + locNode.id() + ", locNode=" + locNode + ']'); } http://git-wip-us.apache.org/repos/asf/ignite/blob/f9aaf035/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 40da281..f33566c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -775,6 +775,8 @@ class ServerImpl extends TcpDiscoveryImpl { msg.warning(warning); + msg.force(true); + msgWorker.addMessage(msg); } } @@ -4610,8 +4612,12 @@ class ServerImpl extends TcpDiscoveryImpl { else { boolean contains; + UUID creatorId = msg.creatorNodeId(); + + assert creatorId != null : msg; + synchronized (mux) { - contains = failedNodes.containsKey(sndNode); + contains = failedNodes.containsKey(sndNode) || ring.node(creatorId) == null; } if (contains) { @@ -4623,25 +4629,29 @@ class ServerImpl extends TcpDiscoveryImpl { } } - UUID nodeId = msg.failedNodeId(); + UUID failedNodeId = msg.failedNodeId(); long order = msg.order(); - TcpDiscoveryNode node = ring.node(nodeId); + TcpDiscoveryNode failedNode = ring.node(failedNodeId); - if (node != null && node.internalOrder() != order) { + if (failedNode != null && failedNode.internalOrder() != order) { if (log.isDebugEnabled()) log.debug("Ignoring node failed message since node internal order does not match " + - "[msg=" + msg + ", node=" + node + ']'); + "[msg=" + msg + ", node=" + failedNode + ']'); return; } - if (node != null) { - assert !node.isLocal() || !msg.verified() : msg; + if (failedNode != null) { + assert !failedNode.isLocal() || !msg.verified() : msg; - synchronized (mux) { - if (!failedNodes.containsKey(node)) - failedNodes.put(node, msg.senderNodeId() != null ? msg.senderNodeId() : getLocalNodeId()); + boolean skipUpdateFailedNodes = msg.force() && !msg.verified(); + + if (!skipUpdateFailedNodes) { + synchronized (mux) { + if (!failedNodes.containsKey(failedNode)) + failedNodes.put(failedNode, msg.senderNodeId() != null ? msg.senderNodeId() : getLocalNodeId()); + } } } else { @@ -4668,11 +4678,11 @@ class ServerImpl extends TcpDiscoveryImpl { } if (msg.verified()) { - node = ring.removeNode(nodeId); + failedNode = ring.removeNode(failedNodeId); - interruptPing(node); + interruptPing(failedNode); - assert node != null; + assert failedNode != null; long topVer; @@ -4698,16 +4708,18 @@ class ServerImpl extends TcpDiscoveryImpl { } synchronized (mux) { - failedNodes.remove(node); + failedNodes.remove(failedNode); - leavingNodes.remove(node); + leavingNodes.remove(failedNode); - failedNodesMsgSent.remove(node.id()); + failedNodesMsgSent.remove(failedNode.id()); - ClientMessageWorker worker = clientMsgWorkers.remove(node.id()); + if (!msg.force()) { // ClientMessageWorker will stop after sending force fail message. + ClientMessageWorker worker = clientMsgWorkers.remove(failedNode.id()); - if (worker != null) - worker.interrupt(); + if (worker != null) + worker.interrupt(); + } } if (msg.warning() != null && !msg.creatorNodeId().equals(getLocalNodeId())) { @@ -4719,10 +4731,10 @@ class ServerImpl extends TcpDiscoveryImpl { } synchronized (mux) { - joiningNodes.remove(node.id()); + joiningNodes.remove(failedNode.id()); } - notifyDiscovery(EVT_NODE_FAILED, topVer, node); + notifyDiscovery(EVT_NODE_FAILED, topVer, failedNode); spi.stats.onNodeFailed(); } @@ -6317,7 +6329,12 @@ class ServerImpl extends TcpDiscoveryImpl { spi.failureDetectionTimeout() : spi.getSocketTimeout()); } - success = true; + boolean clientFailed = msg instanceof TcpDiscoveryNodeFailedMessage && + ((TcpDiscoveryNodeFailedMessage)msg).failedNodeId().equals(clientNodeId); + + assert !clientFailed || msg.force() : msg; + + success = !clientFailed; } catch (IgniteCheckedException | IOException e) { if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/f9aaf035/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java index 783a113..e982b2f 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java @@ -48,6 +48,9 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable { /** */ protected static final int CLIENT_ACK_FLAG_POS = 4; + /** */ + protected static final int FORCE_FAIL_FLAG_POS = 8; + /** Sender of the message (transient). */ private transient UUID sndNodeId; @@ -205,6 +208,24 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable { } /** + * Get force fail node flag. + * + * @return Force fail node flag. + */ + public boolean force() { + return getFlag(FORCE_FAIL_FLAG_POS); + } + + /** + * Sets force fail node flag. + * + * @param force Force fail node flag. + */ + public void force(boolean force) { + setFlag(FORCE_FAIL_FLAG_POS, force); + } + + /** * @return Pending message index. */ public short pendingIndex() { http://git-wip-us.apache.org/repos/asf/ignite/blob/f9aaf035/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java new file mode 100644 index 0000000..d29231e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.Event; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.nio.GridCommunicationClient; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.lang.IgniteRunnable; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; + +/** + * + */ +public class TcpCommunicationSpiDropNodesTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Nodes count. */ + private static final int NODES_CNT = 4; + + /** Block. */ + private static volatile boolean block; + + /** Predicate. */ + private static IgniteBiPredicate pred; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setClockSyncFrequency(300000); + cfg.setFailureDetectionTimeout(1000); + + TestCommunicationSpi spi = new TestCommunicationSpi(); + + spi.setIdleConnectionTimeout(100); + spi.setSharedMemoryPort(-1); + + TcpDiscoverySpi discoSpi = (TcpDiscoverySpi) cfg.getDiscoverySpi(); + discoSpi.setIpFinder(IP_FINDER); + + cfg.setCommunicationSpi(spi); + cfg.setDiscoverySpi(discoSpi); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + block = false; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testOneNode() throws Exception { + pred = new IgniteBiPredicate() { + @Override public boolean apply(ClusterNode locNode, ClusterNode rmtNode) { + return block && rmtNode.order() == 3; + } + }; + + startGrids(NODES_CNT); + + final CountDownLatch latch = new CountDownLatch(1); + + grid(0).events().localListen(new IgnitePredicate() { + @Override + public boolean apply(Event event) { + latch.countDown(); + + return true; + } + }, EVT_NODE_FAILED); + + U.sleep(1000); // Wait for write timeout and closing idle connections. + + block = true; + + grid(0).compute().broadcast(new IgniteRunnable() { + @Override public void run() { + // No-op. + } + }); + + assertTrue(latch.await(15, TimeUnit.SECONDS)); + + assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return grid(3).cluster().topologyVersion() == NODES_CNT + 1; + } + }, 5000)); + + for (int i = 0; i < 10; i++) { + U.sleep(1000); + + assertEquals(NODES_CNT - 1, grid(0).cluster().nodes().size()); + + int liveNodesCnt = 0; + + for (int j = 0; j < NODES_CNT; j++) { + IgniteEx ignite; + + try { + ignite = grid(j); + + log.info("Checking topology for grid(" + j + "): " + ignite.cluster().nodes()); + + ClusterNode locNode = ignite.localNode(); + + if (locNode.order() != 3) { + assertEquals(NODES_CNT - 1, ignite.cluster().nodes().size()); + + for (ClusterNode node : ignite.cluster().nodes()) + assertTrue(node.order() != 3); + + liveNodesCnt++; + } + } + catch (Exception e) { + log.info("Checking topology for grid(" + j + "): no grid in topology."); + } + } + + assertEquals(NODES_CNT - 1, liveNodesCnt); + } + } + + /** + * @throws Exception If failed. + */ + public void testTwoNodesEachOther() throws Exception { + pred = new IgniteBiPredicate() { + @Override public boolean apply(ClusterNode locNode, ClusterNode rmtNode) { + return block && (locNode.order() == 2 || locNode.order() == 4) && + (rmtNode.order() == 2 || rmtNode.order() == 4); + } + }; + + startGrids(NODES_CNT); + + final CountDownLatch latch = new CountDownLatch(1); + + grid(0).events().localListen(new IgnitePredicate() { + @Override + public boolean apply(Event event) { + latch.countDown(); + + return true; + } + }, EVT_NODE_FAILED); + + U.sleep(1000); // Wait for write timeout and closing idle connections. + + block = true; + + final CyclicBarrier barrier = new CyclicBarrier(2); + + IgniteInternalFuture fut1 = GridTestUtils.runAsync(new Callable() { + @Override public Void call() throws Exception { + barrier.await(); + + grid(1).compute().withNoFailover().broadcast(new IgniteRunnable() { + @Override public void run() { + // No-op. + } + }); + + return null; + } + }); + + IgniteInternalFuture fut2 = GridTestUtils.runAsync(new Callable() { + @Override public Void call() throws Exception { + barrier.await(); + + grid(3).compute().withNoFailover().broadcast(new IgniteRunnable() { + @Override public void run() { + // No-op. + } + }); + + return null; + } + }); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return grid(2).cluster().nodes().size() == NODES_CNT - 1; + } + }, 5000); + + try { + fut1.get(); + } + catch (IgniteCheckedException e) { + // No-op. + } + + try { + fut2.get(); + } + catch (IgniteCheckedException e) { + // No-op. + } + + long failedNodeOrder = 1 + 2 + 3 + 4; + + for (ClusterNode node : grid(0).cluster().nodes()) + failedNodeOrder -= node.order(); + + for (int i = 0; i < 10; i++) { + U.sleep(1000); + + assertEquals(NODES_CNT - 1, grid(0).cluster().nodes().size()); + + int liveNodesCnt = 0; + + for (int j = 0; j < NODES_CNT; j++) { + IgniteEx ignite; + + try { + ignite = grid(j); + + log.info("Checking topology for grid(" + j + "): " + ignite.cluster().nodes()); + + ClusterNode locNode = ignite.localNode(); + + if (locNode.order() != failedNodeOrder) { + assertEquals(NODES_CNT - 1, ignite.cluster().nodes().size()); + + for (ClusterNode node : ignite.cluster().nodes()) + assertTrue(node.order() != failedNodeOrder); + + liveNodesCnt++; + } + } + catch (Exception e) { + log.info("Checking topology for grid(" + j + "): no grid in topology."); + } + } + + assertEquals(NODES_CNT - 1, liveNodesCnt); + } + } + + /** + * + */ + private static class TestCommunicationSpi extends TcpCommunicationSpi { + /** {@inheritDoc} */ + @Override protected GridCommunicationClient createTcpClient(ClusterNode node) throws IgniteCheckedException { + if (pred.apply(getLocalNode(), node)) { + Map attrs = new HashMap<>(node.attributes()); + + attrs.put(createAttributeName(ATTR_ADDRS), Collections.singleton("127.0.0.1")); + attrs.put(createAttributeName(ATTR_PORT), 47200); + attrs.put(createAttributeName(ATTR_EXT_ADDRS), Collections.emptyList()); + attrs.put(createAttributeName(ATTR_HOST_NAMES), Collections.emptyList()); + + ((TcpDiscoveryNode)node).setAttributes(attrs); + } + + return super.createTcpClient(node); + } + + /** + * @param name Name. + */ + private String createAttributeName(String name) { + return getClass().getSimpleName() + '.' + name; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f9aaf035/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java new file mode 100644 index 0000000..6e99487 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.Event; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.nio.GridCommunicationClient; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.lang.IgniteRunnable; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; + +/** + * Tests that faulty client will be failed if connection can't be established. + */ +public class TcpCommunicationSpiFaultyClientTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Predicate. */ + private static final IgnitePredicate PRED = new IgnitePredicate() { + @Override public boolean apply(ClusterNode node) { + return block && node.order() == 3; + } + }; + + /** Client mode. */ + private static boolean clientMode; + + /** Block. */ + private static volatile boolean block; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setClockSyncFrequency(300000); + cfg.setFailureDetectionTimeout(1000); + cfg.setClientMode(clientMode); + + TestCommunicationSpi spi = new TestCommunicationSpi(); + + spi.setIdleConnectionTimeout(100); + spi.setSharedMemoryPort(-1); + + TcpDiscoverySpi discoSpi = (TcpDiscoverySpi) cfg.getDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + discoSpi.setClientReconnectDisabled(true); + + cfg.setCommunicationSpi(spi); + cfg.setDiscoverySpi(discoSpi); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + block = false; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testNoServerOnHost() throws Exception { + testFailClient(null); + } + + /** + * @throws Exception If failed. + */ + public void testNotAcceptedConnection() throws Exception { + testFailClient(new FakeServer()); + } + + /** + * @param srv Server. + * @throws Exception If failed. + */ + private void testFailClient(FakeServer srv) throws Exception { + IgniteInternalFuture fut = null; + + try { + if (srv != null) + fut = GridTestUtils.runMultiThreadedAsync(srv, 1, "fake-server"); + + clientMode = false; + + startGrids(2); + + clientMode = true; + + startGrid(2); + startGrid(3); + + U.sleep(1000); // Wait for write timeout and closing idle connections. + + final CountDownLatch latch = new CountDownLatch(1); + + grid(0).events().localListen(new IgnitePredicate() { + @Override + public boolean apply(Event event) { + latch.countDown(); + + return true; + } + }, EVT_NODE_FAILED); + + block = true; + + try { + grid(0).compute(grid(0).cluster().forClients()).withNoFailover().broadcast(new IgniteRunnable() { + @Override public void run() { + // No-op. + } + }); + } + catch (IgniteException e) { + // No-op. + } + + assertTrue(latch.await(3, TimeUnit.SECONDS)); + + assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return grid(0).cluster().forClients().nodes().size() == 1; + } + }, 5000)); + + for (int i = 0; i < 5; i++) { + U.sleep(1000); + + log.info("Check topology (" + (i + 1) + "): " + grid(0).cluster().nodes()); + + assertEquals(1, grid(0).cluster().forClients().nodes().size()); + } + } + finally { + if (srv != null) { + srv.stop(); + + assert fut != null; + + fut.get(); + } + + stopAllGrids(); + } + } + + /** + * Server that emulates connection troubles. + */ + private static class FakeServer implements Runnable { + /** Server. */ + private final ServerSocket srv; + + /** Stop. */ + private volatile boolean stop; + + /** + * Default constructor. + */ + FakeServer() throws IOException { + this.srv = new ServerSocket(47200, 50, InetAddress.getByName("127.0.0.1")); + } + + /** + * + */ + public void stop() { + stop = true; + } + + /** {@inheritDoc} */ + @Override public void run() { + try { + while (!stop) { + try { + U.sleep(10); + } + catch (IgniteInterruptedCheckedException e) { + // No-op. + } + } + } + finally { + U.closeQuiet(srv); + } + } + } + + /** + * + */ + private static class TestCommunicationSpi extends TcpCommunicationSpi { + /** {@inheritDoc} */ + @Override protected GridCommunicationClient createTcpClient(ClusterNode node) throws IgniteCheckedException { + if (PRED.apply(node)) { + Map attrs = new HashMap<>(node.attributes()); + + attrs.put(createAttributeName(ATTR_ADDRS), Collections.singleton("127.0.0.1")); + attrs.put(createAttributeName(ATTR_PORT), 47200); + attrs.put(createAttributeName(ATTR_EXT_ADDRS), Collections.emptyList()); + attrs.put(createAttributeName(ATTR_HOST_NAMES), Collections.emptyList()); + + ((TcpDiscoveryNode)node).setAttributes(attrs); + } + + return super.createTcpClient(node); + } + + /** + * @param name Name. + */ + private String createAttributeName(String name) { + return getClass().getSimpleName() + '.' + name; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f9aaf035/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java index e0e8eba..6365443 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java @@ -82,6 +82,7 @@ public class GridTestNode extends GridMetadataAwareAdapter implements ClusterNod private void initAttributes() { attrs.put(IgniteNodeAttributes.ATTR_BUILD_VER, "10"); attrs.put(IgniteNodeAttributes.ATTR_GRID_NAME, "null"); + attrs.put(IgniteNodeAttributes.ATTR_CLIENT_MODE, false); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/f9aaf035/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java index 9f507e6..ffaec4f 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java @@ -102,6 +102,7 @@ import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.support.FileSystemXmlApplicationContext; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY; import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; @@ -173,6 +174,7 @@ public abstract class GridAbstractTest extends TestCase { static { System.setProperty(IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, "10000"); System.setProperty(IgniteSystemProperties.IGNITE_UPDATE_NOTIFIER, "false"); + System.setProperty(IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY, "1"); if (BINARY_MARSHALLER) GridTestProperties.setProperty(GridTestProperties.MARSH_CLASS_NAME, BinaryMarshaller.class.getName()); http://git-wip-us.apache.org/repos/asf/ignite/blob/f9aaf035/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java index c557fbb..ddc2551 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java @@ -35,6 +35,8 @@ import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiTcpFailure import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiTcpNoDelayOffSelfTest; import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiTcpSelfTest; import org.apache.ignite.spi.communication.tcp.IgniteTcpCommunicationRecoveryAckClosureSelfTest; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiDropNodesTest; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiFaultyClientTest; /** * Test suite for all communication SPIs. @@ -72,6 +74,9 @@ public class IgniteSpiCommunicationSelfTestSuite extends TestSuite { suite.addTest(new TestSuite(GridTcpCommunicationSpiConfigSelfTest.class)); + suite.addTest(new TestSuite(TcpCommunicationSpiFaultyClientTest.class)); + suite.addTest(new TestSuite(TcpCommunicationSpiDropNodesTest.class)); + return suite; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f9aaf035/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java index 7e0d20b..9f56877 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java @@ -633,7 +633,7 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac assertEquals(2, qry.getAll().size()); - Throwable throwable = GridTestUtils.assertThrowsInherited(log, new GridPlainCallable() { + GridTestUtils.assertThrows(log, new GridPlainCallable() { @Override public Void call() throws Exception { QueryCursor> qry = cache.query(new SqlQuery(Type1.class, "FROM Type1")); @@ -642,11 +642,7 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac return null; } - }, RuntimeException.class, null); - - assertNotNull(throwable); - - assertTrue(throwable instanceof IgniteException || throwable instanceof CacheException); + }, CacheException.class, null); } /**