Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 32C021774A for ; Wed, 15 Apr 2015 09:29:03 +0000 (UTC) Received: (qmail 26606 invoked by uid 500); 15 Apr 2015 09:29:03 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 26543 invoked by uid 500); 15 Apr 2015 09:29:03 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 26491 invoked by uid 99); 15 Apr 2015 09:29:03 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Apr 2015 09:29:03 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 15 Apr 2015 09:28:59 +0000 Received: (qmail 24724 invoked by uid 99); 15 Apr 2015 09:28:39 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Apr 2015 09:28:39 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A0E8DDFA69; Wed, 15 Apr 2015 09:28:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Wed, 15 Apr 2015 09:28:39 -0000 Message-Id: <663976c1e91444c7b40d079cbb7d8cb1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [01/23] incubator-ignite git commit: IGNITE-714 - Fail-fast node failure detction X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-ignite Updated Branches: refs/heads/ignite-sprint-4 0eadae95a -> f4dac4ed5 IGNITE-714 - Fail-fast node failure detction Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ef12ba60 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ef12ba60 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ef12ba60 Branch: refs/heads/ignite-sprint-4 Commit: ef12ba6007a1938abf64a97b2f020e2390b6a8e6 Parents: bad0161 Author: Valentin Kulichenko Authored: Thu Apr 9 21:18:29 2015 -0700 Committer: Valentin Kulichenko Committed: Thu Apr 9 21:18:29 2015 -0700 ---------------------------------------------------------------------- modules/core/pom.xml | 2 +- .../internal/managers/GridManagerAdapter.java | 4 + .../discovery/GridDiscoveryManager.java | 14 +++ .../org/apache/ignite/spi/IgniteSpiAdapter.java | 5 + .../org/apache/ignite/spi/IgniteSpiContext.java | 6 + .../communication/tcp/TcpCommunicationSpi.java | 47 ++++++-- .../ignite/spi/discovery/DiscoverySpi.java | 7 ++ .../discovery/tcp/TcpClientDiscoverySpi.java | 5 + .../spi/discovery/tcp/TcpDiscoverySpi.java | 12 ++ .../ignite/internal/GridFailFastSelfTest.java | 112 +++++++++++++++++++ .../testframework/GridSpiTestContext.java | 5 + .../ignite/testsuites/IgniteBasicTestSuite.java | 1 + 12 files changed, 209 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ef12ba60/modules/core/pom.xml ---------------------------------------------------------------------- diff --git a/modules/core/pom.xml b/modules/core/pom.xml index 466ccb9..41444c8 100644 --- a/modules/core/pom.xml +++ b/modules/core/pom.xml @@ -9,7 +9,7 @@ the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 - +IGNITE-714 - Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ef12ba60/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java index c478d28..90031c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java @@ -536,6 +536,10 @@ public abstract class GridManagerAdapter implements GridMan return ctx.io().messageFactory(); } + @Override public boolean tryFailNode(UUID nodeId) { + return ctx.discovery().tryFailNode(nodeId); + } + /** * @param e Exception to handle. * @return GridSpiException Converted exception. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ef12ba60/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 04ff423..856c523 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -1335,6 +1335,20 @@ public class GridDiscoveryManager extends GridManagerAdapter { } /** + * @param nodeId Node ID. + * @return Whether node is failed. + */ + public boolean tryFailNode(UUID nodeId) { + if (!getSpi().pingNode(nodeId)) { + getSpi().failNode(nodeId); + + return true; + } + + return false; + } + + /** * Updates topology version if current version is smaller than updated. * * @param updated Updated topology version. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ef12ba60/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java index f198210..7802d17 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java @@ -723,5 +723,10 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement @Override public boolean isStopping() { return stopping; } + + /** {@inheritDoc} */ + @Override public boolean tryFailNode(UUID nodeId) { + return false; + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ef12ba60/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java index 5eef37a..e203387 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java @@ -351,4 +351,10 @@ public interface IgniteSpiContext { * @return {@code True} if node started shutdown sequence. */ public boolean isStopping(); + + /** + * @param nodeId Node ID. + * @return If node was failed. + */ + public boolean tryFailNode(UUID nodeId); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ef12ba60/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index ff84e5b..053f121 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -258,20 +258,24 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter rmv.forceClose(); if (!isNodeStopping()) { - GridNioRecoveryDescriptor recoveryData = ses.recoveryDescriptor(); + boolean failed = getSpiContext().tryFailNode(id); - if (recoveryData != null) { - if (recoveryData.nodeAlive(getSpiContext().node(id))) { - if (!recoveryData.messagesFutures().isEmpty()) { - if (log.isDebugEnabled()) - log.debug("Session was closed but there are unacknowledged messages, " + - "will try to reconnect [rmtNode=" + recoveryData.node().id() + ']'); + if (!failed) { + GridNioRecoveryDescriptor recoveryData = ses.recoveryDescriptor(); - recoveryWorker.addReconnectRequest(recoveryData); + if (recoveryData != null) { + if (recoveryData.nodeAlive(getSpiContext().node(id))) { + if (!recoveryData.messagesFutures().isEmpty()) { + if (log.isDebugEnabled()) + log.debug("Session was closed but there are unacknowledged messages, " + + "will try to reconnect [rmtNode=" + recoveryData.node().id() + ']'); + + recoveryWorker.addReconnectRequest(recoveryData); + } } + else + recoveryData.onNodeLeft(); } - else - recoveryData.onNodeLeft(); } } } @@ -2054,6 +2058,29 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } /** + * Stops service threads to simulate node failure. + * + * FOR TEST PURPOSES ONLY!!! + */ + void simulateNodeFailure() { + if (nioSrvr != null) + nioSrvr.stop(); + + U.interrupt(idleClientWorker); + U.interrupt(clientFlushWorker); + U.interrupt(sockTimeoutWorker); + U.interrupt(recoveryWorker); + + U.join(idleClientWorker, log); + U.join(clientFlushWorker, log); + U.join(sockTimeoutWorker, log); + U.join(recoveryWorker, log); + + for (GridCommunicationClient client : clients.values()) + client.forceClose(); + } + + /** * @param node Node. * @return Recovery receive data for given node. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ef12ba60/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java index 42bad22..7560999 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java @@ -144,4 +144,11 @@ public interface DiscoverySpi extends IgniteSpi { * @param evt Event. */ public void sendCustomEvent(Serializable evt); + + /** + * Initiates failure of provided node. + * + * @param nodeId Node ID. + */ + public void failNode(UUID nodeId); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ef12ba60/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java index bf69efb..8b3113f 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java @@ -379,6 +379,11 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp throw new UnsupportedOperationException(); } + /** {@inheritDoc} */ + @Override public void failNode(UUID nodeId) { + // No-op. + } + /** * @param recon Reconnect flag. * @return Whether joined successfully. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ef12ba60/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index bad8837..1c09711 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -1249,6 +1249,18 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov msgWorker.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt)); } + /** {@inheritDoc} */ + @Override public void failNode(UUID nodeId) { + ClusterNode node = ring.node(nodeId); + + if (node != null) { + TcpDiscoveryNodeFailedMessage msg = new TcpDiscoveryNodeFailedMessage(getLocalNodeId(), + node.id(), node.order()); + + msgWorker.addMessage(msg); + } + } + /** * Tries to join this node to topology. * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ef12ba60/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastSelfTest.java new file mode 100644 index 0000000..bc4de65 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastSelfTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.messaging.*; +import org.apache.ignite.spi.communication.*; +import org.apache.ignite.spi.discovery.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.*; + +/** + * Fail fast test. + */ +public class GridFailFastSelfTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + disco.setHeartbeatFrequency(10000); + + cfg.setDiscoverySpi(disco); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testFailFast() throws Exception { + startGridsMultiThreaded(5); + + final CountDownLatch failLatch = new CountDownLatch(4); + + for (int i = 0; i < 5; i++) { + ignite(i).events().localListen(new IgnitePredicate() { + @Override public boolean apply(Event evt) { + info(evt.shortDisplay()); + + failLatch.countDown(); + + return true; + } + }, EventType.EVT_NODE_FAILED); + } + + Ignite ignite1 = ignite(0); + Ignite ignite2 = ignite(1); + + ignite1.message().localListen(null, new MessagingListenActor() { + @Override protected void receive(UUID nodeId, Object rcvMsg) throws Throwable { + respond(rcvMsg); + } + }); + + ignite2.message().localListen(null, new MessagingListenActor() { + @Override protected void receive(UUID nodeId, Object rcvMsg) throws Throwable { + respond(rcvMsg); + } + }); + + ignite1.message(ignite1.cluster().forRemotes()).send(null, "Message"); + + failNode(ignite1); + + assert failLatch.await(500, TimeUnit.MILLISECONDS); + } + + /** + * @param ignite Ignite. + * @throws Exception In case of error. + */ + private void failNode(Ignite ignite) throws Exception { + DiscoverySpi disco = ignite.configuration().getDiscoverySpi(); + + U.invoke(disco.getClass(), disco, "simulateNodeFailure"); + + CommunicationSpi comm = ignite.configuration().getCommunicationSpi(); + + U.invoke(comm.getClass(), comm, "simulateNodeFailure"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ef12ba60/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java index a4e011a..c0fe759 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java @@ -519,6 +519,11 @@ public class GridSpiTestContext implements IgniteSpiContext { return false; } + /** {@inheritDoc} */ + @Override public boolean tryFailNode(UUID nodeId) { + return false; + } + /** * @param cacheName Cache name. * @return Map representing cache. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ef12ba60/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java index 7ae237f..081de2f 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java @@ -69,6 +69,7 @@ public class IgniteBasicTestSuite extends TestSuite { suite.addTestSuite(GridSuppressedExceptionSelfTest.class); suite.addTestSuite(GridLifecycleAwareSelfTest.class); suite.addTestSuite(GridMessageListenSelfTest.class); + suite.addTestSuite(GridFailFastSelfTest.class); return suite; }