From commits-return-117558-archive-asf-public=cust-asf.ponee.io@ignite.apache.org Tue Apr 10 10:59:33 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id D49DD180718 for ; Tue, 10 Apr 2018 10:59:30 +0200 (CEST) Received: (qmail 40373 invoked by uid 500); 10 Apr 2018 08:59:29 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 40298 invoked by uid 99); 10 Apr 2018 08:59:29 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 10 Apr 2018 08:59:29 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2D27EF6082; Tue, 10 Apr 2018 08:59:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: agoncharuk@apache.org To: commits@ignite.apache.org Date: Tue, 10 Apr 2018 08:59:30 -0000 Message-Id: <6736f9b6b42942b9a6617f25639f5ed8@git.apache.org> In-Reply-To: <4c3400779b6f431ebd6bfa10411a1c58@git.apache.org> References: <4c3400779b6f431ebd6bfa10411a1c58@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [02/12] ignite git commit: IGNITE-7222 Added ZooKeeper discovery SPI http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java new file mode 100644 index 0000000..fb12c3a --- /dev/null +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java @@ -0,0 +1,4847 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.BitSet; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.curator.test.TestingCluster; +import org.apache.curator.test.TestingZooKeeperServer; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.CommunicationFailureContext; +import org.apache.ignite.configuration.CommunicationFailureResolver; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.events.Event; +import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.DiscoverySpiTestListener; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.IgnitionEx; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.managers.discovery.CustomEventListener; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData; +import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; +import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; +import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheAbstractFullApiSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.TestCacheNodeExcludingFilter; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; +import org.apache.ignite.internal.processors.security.SecurityContext; +import org.apache.ignite.internal.util.future.GridCompoundFuture; +import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.T3; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgniteOutClosure; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.marshaller.jdk.JdkMarshaller; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.security.SecurityCredentials; +import org.apache.ignite.plugin.security.SecurityPermission; +import org.apache.ignite.plugin.security.SecuritySubject; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.resources.LoggerResource; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.DiscoverySpi; +import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; +import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator; +import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi; +import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpiTestSuite2; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZKUtil; +import org.apache.zookeeper.ZkTestClientCnxnSocketNIO; +import org.apache.zookeeper.ZooKeeper; +import org.jetbrains.annotations.Nullable; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED; +import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED; +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME; +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS; +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SECURITY_SUBJECT_V2; +import static org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl.IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD; +import static org.apache.zookeeper.ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET; + +/** + * + */ +@SuppressWarnings("deprecation") +public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { + /** */ + private static final String IGNITE_ZK_ROOT = ZookeeperDiscoverySpi.DFLT_ROOT_PATH; + + /** */ + private static final int ZK_SRVS = 3; + + /** */ + private static TestingCluster zkCluster; + + /** To run test with real local ZK. */ + private static final boolean USE_TEST_CLUSTER = true; + + /** */ + private boolean client; + + /** */ + private static ThreadLocal clientThreadLoc = new ThreadLocal<>(); + + /** */ + private static ConcurrentHashMap> evts = new ConcurrentHashMap<>(); + + /** */ + private static volatile boolean err; + + /** */ + private boolean testSockNio; + + /** */ + private boolean testCommSpi; + + /** */ + private long sesTimeout; + + /** */ + private long joinTimeout; + + /** */ + private boolean clientReconnectDisabled; + + /** */ + private ConcurrentHashMap spis = new ConcurrentHashMap<>(); + + /** */ + private Map userAttrs; + + /** */ + private boolean dfltConsistenId; + + /** */ + private UUID nodeId; + + /** */ + private boolean persistence; + + /** */ + private IgniteOutClosure commFailureRslvr; + + /** */ + private IgniteOutClosure auth; + + /** */ + private String zkRootPath; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(final String igniteInstanceName) throws Exception { + if (testSockNio) + System.setProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET, ZkTestClientCnxnSocketNIO.class.getName()); + + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + if (nodeId != null) + cfg.setNodeId(nodeId); + + if (!dfltConsistenId) + cfg.setConsistentId(igniteInstanceName); + + ZookeeperDiscoverySpi zkSpi = new ZookeeperDiscoverySpi(); + + if (joinTimeout != 0) + zkSpi.setJoinTimeout(joinTimeout); + + zkSpi.setSessionTimeout(sesTimeout > 0 ? sesTimeout : 10_000); + + zkSpi.setClientReconnectDisabled(clientReconnectDisabled); + + // Set authenticator for basic sanity tests. + if (auth != null) { + zkSpi.setAuthenticator(auth.apply()); + + zkSpi.setInternalListener(new IgniteDiscoverySpiInternalListener() { + @Override public void beforeJoin(ClusterNode locNode, IgniteLogger log) { + ZookeeperClusterNode locNode0 = (ZookeeperClusterNode)locNode; + + Map attrs = new HashMap<>(locNode0.getAttributes()); + + attrs.put(ATTR_SECURITY_CREDENTIALS, new SecurityCredentials(null, null, igniteInstanceName)); + + locNode0.setAttributes(attrs); + } + + @Override public boolean beforeSendCustomEvent(DiscoverySpi spi, IgniteLogger log, DiscoverySpiCustomMessage msg) { + return false; + } + }); + } + + spis.put(igniteInstanceName, zkSpi); + + if (USE_TEST_CLUSTER) { + assert zkCluster != null; + + zkSpi.setZkConnectionString(zkCluster.getConnectString()); + + if (zkRootPath != null) + zkSpi.setZkRootPath(zkRootPath); + } + else + zkSpi.setZkConnectionString("localhost:2181"); + + cfg.setDiscoverySpi(zkSpi); + + CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME); + + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + cfg.setCacheConfiguration(ccfg); + + Boolean clientMode = clientThreadLoc.get(); + + if (clientMode != null) + cfg.setClientMode(clientMode); + else + cfg.setClientMode(client); + + if (userAttrs != null) + cfg.setUserAttributes(userAttrs); + + Map, int[]> lsnrs = new HashMap<>(); + + lsnrs.put(new IgnitePredicate() { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") + @Override public boolean apply(Event evt) { + try { + DiscoveryEvent discoveryEvt = (DiscoveryEvent)evt; + + UUID locId = ((IgniteKernal)ignite).context().localNodeId(); + + Map nodeEvts = evts.get(locId); + + if (nodeEvts == null) { + Object old = evts.put(locId, nodeEvts = new TreeMap<>()); + + assertNull(old); + + synchronized (nodeEvts) { + DiscoveryLocalJoinData locJoin = ((IgniteKernal)ignite).context().discovery().localJoin(); + + nodeEvts.put(locJoin.event().topologyVersion(), locJoin.event()); + } + } + + synchronized (nodeEvts) { + DiscoveryEvent old = nodeEvts.put(discoveryEvt.topologyVersion(), discoveryEvt); + + assertNull(old); + } + } + catch (Throwable e) { + error("Unexpected error [evt=" + evt + ", err=" + e + ']', e); + + err = true; + } + + return true; + } + }, new int[]{EVT_NODE_JOINED, EVT_NODE_FAILED, EVT_NODE_LEFT}); + + cfg.setLocalEventListeners(lsnrs); + + if (persistence) { + DataStorageConfiguration memCfg = new DataStorageConfiguration() + .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setMaxSize(100 * 1024 * 1024). + setPersistenceEnabled(true)) + .setPageSize(1024) + .setWalMode(WALMode.LOG_ONLY); + + cfg.setDataStorageConfiguration(memCfg); + } + + if (testCommSpi) + cfg.setCommunicationSpi(new ZkTestCommunicationSpi()); + + if (commFailureRslvr != null) + cfg.setCommunicationFailureResolver(commFailureRslvr.apply()); + + return cfg; + } + + /** + * @param clientMode Client mode flag for started nodes. + */ + private void clientMode(boolean clientMode) { + client = clientMode; + } + + /** + * @param clientMode Client mode flag for nodes started from current thread. + */ + private void clientModeThreadLocal(boolean clientMode) { + clientThreadLoc.set(clientMode); + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + System.setProperty(ZookeeperDiscoveryImpl.IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_TIMEOUT, "1000"); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopZkCluster(); + + System.clearProperty(ZookeeperDiscoveryImpl.IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_TIMEOUT); + + super.afterTestsStopped(); + } + + /** + * + */ + private void stopZkCluster() { + if (zkCluster != null) { + try { + zkCluster.close(); + } + catch (Exception e) { + U.error(log, "Failed to stop Zookeeper client: " + e, e); + } + + zkCluster = null; + } + } + + /** + * + */ + private static void ackEveryEventSystemProperty() { + System.setProperty(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD, "1"); + } + + /** + * + */ + private void clearAckEveryEventSystemProperty() { + System.setProperty(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD, "1"); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + if (USE_TEST_CLUSTER && zkCluster == null) { + zkCluster = ZookeeperDiscoverySpiTestSuite2.createTestingCluster(ZK_SRVS); + + zkCluster.start(); + } + + reset(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + clearAckEveryEventSystemProperty(); + + try { + assertFalse("Unexpected error, see log for details", err); + + checkEventsConsistency(); + + checkInternalStructuresCleanup(); + + //TODO uncomment when https://issues.apache.org/jira/browse/IGNITE-8193 is fixed +// checkZkNodesCleanup(); + } + finally { + reset(); + + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + private void checkInternalStructuresCleanup() throws Exception { + for (Ignite node : G.allGrids()) { + final AtomicReference res = GridTestUtils.getFieldValue(spi(node), "impl", "commErrProcFut"); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return res.get() == null; + } + }, 30_000); + + assertNull(res.get()); + } + } + + /** + * @throws Exception If failed. + */ + public void testZkRootNotExists() throws Exception { + zkRootPath = "/a/b/c"; + + for (int i = 0; i < 3; i++) { + reset(); + + startGridsMultiThreaded(5); + + waitForTopology(5); + + stopAllGrids(); + + checkEventsConsistency(); + } + } + + /** + * @throws Exception If failed. + */ + public void testMetadataUpdate() throws Exception { + startGrid(0); + + GridTestUtils.runMultiThreaded(new Callable() { + @Override public Void call() throws Exception { + ignite(0).configuration().getMarshaller().marshal(new C1()); + ignite(0).configuration().getMarshaller().marshal(new C2()); + + return null; + } + }, 64, "marshal"); + } + + /** + * @throws Exception If failed. + */ + public void testNodeAddresses() throws Exception { + startGridsMultiThreaded(3); + + clientMode(true); + + startGridsMultiThreaded(3, 3); + + waitForTopology(6); + + for (Ignite node : G.allGrids()) { + ClusterNode locNode0 = node.cluster().localNode(); + + assertTrue(locNode0.addresses().size() > 0); + assertTrue(locNode0.hostNames().size() > 0); + + for (ClusterNode node0 : node.cluster().nodes()) { + assertTrue(node0.addresses().size() > 0); + assertTrue(node0.hostNames().size() > 0); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testSetConsistentId() throws Exception { + startGridsMultiThreaded(3); + + clientMode(true); + + startGridsMultiThreaded(3, 3); + + waitForTopology(6); + + for (Ignite node : G.allGrids()) { + ClusterNode locNode0 = node.cluster().localNode(); + + assertEquals(locNode0.attribute(ATTR_IGNITE_INSTANCE_NAME), + locNode0.consistentId()); + + for (ClusterNode node0 : node.cluster().nodes()) { + assertEquals(node0.attribute(ATTR_IGNITE_INSTANCE_NAME), + node0.consistentId()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testDefaultConsistentId() throws Exception { + dfltConsistenId = true; + + startGridsMultiThreaded(3); + + clientMode(true); + + startGridsMultiThreaded(3, 3); + + waitForTopology(6); + + for (Ignite node : G.allGrids()) { + ClusterNode locNode0 = node.cluster().localNode(); + + assertNotNull(locNode0.consistentId()); + + for (ClusterNode node0 : node.cluster().nodes()) + assertNotNull(node0.consistentId()); + } + } + + /** + * @throws Exception If failed. + */ + public void testClientNodesStatus() throws Exception { + startGrid(0); + + for (Ignite node : G.allGrids()) { + assertEquals(0, node.cluster().forClients().nodes().size()); + assertEquals(1, node.cluster().forServers().nodes().size()); + } + + clientMode(true); + + startGrid(1); + + for (Ignite node : G.allGrids()) { + assertEquals(1, node.cluster().forClients().nodes().size()); + assertEquals(1, node.cluster().forServers().nodes().size()); + } + + clientMode(false); + + startGrid(2); + + clientMode(true); + + startGrid(3); + + for (Ignite node : G.allGrids()) { + assertEquals(2, node.cluster().forClients().nodes().size()); + assertEquals(2, node.cluster().forServers().nodes().size()); + } + + stopGrid(1); + + waitForTopology(3); + + for (Ignite node : G.allGrids()) { + assertEquals(1, node.cluster().forClients().nodes().size()); + assertEquals(2, node.cluster().forServers().nodes().size()); + } + + stopGrid(2); + + waitForTopology(2); + + for (Ignite node : G.allGrids()) { + assertEquals(1, node.cluster().forClients().nodes().size()); + assertEquals(1, node.cluster().forServers().nodes().size()); + } + } + + /** + * @throws Exception If failed. + */ + public void _testLocalAuthenticationFails() throws Exception { + auth = ZkTestNodeAuthenticator.factory(getTestIgniteInstanceName(0)); + + Throwable err = GridTestUtils.assertThrows(log, new Callable() { + @Override public Void call() throws Exception { + startGrid(0); + + return null; + } + }, IgniteCheckedException.class, null); + + IgniteSpiException spiErr = X.cause(err, IgniteSpiException.class); + + assertNotNull(spiErr); + assertTrue(spiErr.getMessage().contains("Authentication failed for local node")); + + startGrid(1); + startGrid(2); + + checkTestSecuritySubject(2); + } + + /** + * @throws Exception If failed. + */ + public void testAuthentication() throws Exception { + auth = ZkTestNodeAuthenticator.factory(getTestIgniteInstanceName(1), + getTestIgniteInstanceName(5)); + + startGrid(0); + + checkTestSecuritySubject(1); + + { + clientMode(false); + checkStartFail(1); + + clientMode(true); + checkStartFail(1); + + clientMode(false); + } + + startGrid(2); + + checkTestSecuritySubject(2); + + stopGrid(2); + + checkTestSecuritySubject(1); + + startGrid(2); + + checkTestSecuritySubject(2); + + stopGrid(0); + + checkTestSecuritySubject(1); + + checkStartFail(1); + + clientMode(false); + + startGrid(3); + + clientMode(true); + + startGrid(4); + + clientMode(false); + + startGrid(0); + + checkTestSecuritySubject(4); + + checkStartFail(1); + checkStartFail(5); + + clientMode(true); + + checkStartFail(1); + checkStartFail(5); + } + + /** + * @param nodeIdx Node index. + */ + private void checkStartFail(final int nodeIdx) { + Throwable err = GridTestUtils.assertThrows(log, new Callable() { + @Override public Void call() throws Exception { + startGrid(nodeIdx); + + return null; + } + }, IgniteCheckedException.class, null); + + IgniteSpiException spiErr = X.cause(err, IgniteSpiException.class); + + assertNotNull(spiErr); + assertTrue(spiErr.getMessage().contains("Authentication failed")); + } + + /** + * @param expNodes Expected nodes number. + * @throws Exception If failed. + */ + private void checkTestSecuritySubject(int expNodes) throws Exception { + waitForTopology(expNodes); + + List nodes = G.allGrids(); + + JdkMarshaller marsh = new JdkMarshaller(); + + for (Ignite ignite : nodes) { + Collection nodes0 = ignite.cluster().nodes(); + + assertEquals(nodes.size(), nodes0.size()); + + for (ClusterNode node : nodes0) { + byte[] secSubj = node.attribute(ATTR_SECURITY_SUBJECT_V2); + + assertNotNull(secSubj); + + ZkTestNodeAuthenticator.TestSecurityContext secCtx = marsh.unmarshal(secSubj, null); + + assertEquals(node.attribute(ATTR_IGNITE_INSTANCE_NAME), secCtx.nodeName); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testStopNode_1() throws Exception { + startGrids(5); + + waitForTopology(5); + + stopGrid(3); + + waitForTopology(4); + + startGrid(3); + + waitForTopology(5); + } + + /** + * @throws Exception If failed. + */ + public void testCustomEventsSimple1_SingleNode() throws Exception { + ackEveryEventSystemProperty(); + + Ignite srv0 = startGrid(0); + + srv0.createCache(new CacheConfiguration<>("c1")); + + waitForEventsAcks(srv0); + } + + /** + * @throws Exception If failed. + */ + public void testCustomEventsSimple1_5_Nodes() throws Exception { + ackEveryEventSystemProperty(); + + Ignite srv0 = startGrids(5); + + srv0.createCache(new CacheConfiguration<>("c1")); + + awaitPartitionMapExchange(); + + waitForEventsAcks(srv0); + } + + /** + * @throws Exception If failed. + */ + public void testCustomEvents_FastStopProcess_1() throws Exception { + customEvents_FastStopProcess(1, 0); + } + + /** + * @throws Exception If failed. + */ + public void testCustomEvents_FastStopProcess_2() throws Exception { + customEvents_FastStopProcess(5, 5); + } + + /** + * @param srvs Servers number. + * @param clients Clients number. + * @throws Exception If failed. + */ + private void customEvents_FastStopProcess(int srvs, int clients) throws Exception { + ackEveryEventSystemProperty(); + + Map>> rcvdMsgs = + new ConcurrentHashMap<>(); + + Ignite crd = startGrid(0); + + UUID crdId = crd.cluster().localNode().id(); + + if (srvs > 1) + startGridsMultiThreaded(1, srvs - 1); + + if (clients > 0) { + client = true; + + startGridsMultiThreaded(srvs, clients); + } + + awaitPartitionMapExchange(); + + List nodes = G.allGrids(); + + assertEquals(srvs + clients, nodes.size()); + + for (Ignite node : nodes) + registerTestEventListeners(node, rcvdMsgs); + + int payload = 0; + + AffinityTopologyVersion topVer = ((IgniteKernal)crd).context().discovery().topologyVersionEx(); + + for (Ignite node : nodes) { + UUID sndId = node.cluster().localNode().id(); + + info("Send from node: " + sndId); + + GridDiscoveryManager discoveryMgr = ((IgniteKernal)node).context().discovery(); + + { + List> expCrdMsgs = new ArrayList<>(); + List> expNodesMsgs = Collections.emptyList(); + + TestFastStopProcessCustomMessage msg = new TestFastStopProcessCustomMessage(false, payload++); + + expCrdMsgs.add(new T3(topVer, sndId, msg)); + + discoveryMgr.sendCustomEvent(msg); + + doSleep(200); // Wait some time to check extra messages are not received. + + checkEvents(crd, rcvdMsgs, expCrdMsgs); + + for (Ignite node0 : nodes) { + if (node0 != crd) + checkEvents(node0, rcvdMsgs, expNodesMsgs); + } + + rcvdMsgs.clear(); + } + { + List> expCrdMsgs = new ArrayList<>(); + List> expNodesMsgs = new ArrayList<>(); + + TestFastStopProcessCustomMessage msg = new TestFastStopProcessCustomMessage(true, payload++); + + expCrdMsgs.add(new T3(topVer, sndId, msg)); + + discoveryMgr.sendCustomEvent(msg); + + TestFastStopProcessCustomMessageAck ackMsg = new TestFastStopProcessCustomMessageAck(msg.payload); + + expCrdMsgs.add(new T3(topVer, crdId, ackMsg)); + expNodesMsgs.add(new T3(topVer, crdId, ackMsg)); + + doSleep(200); // Wait some time to check extra messages are not received. + + checkEvents(crd, rcvdMsgs, expCrdMsgs); + + for (Ignite node0 : nodes) { + if (node0 != crd) + checkEvents(node0, rcvdMsgs, expNodesMsgs); + } + + rcvdMsgs.clear(); + } + + waitForEventsAcks(crd); + } + } + + /** + * @param node Node to check. + * @param rcvdMsgs Received messages. + * @param expMsgs Expected messages. + * @throws Exception If failed. + */ + private void checkEvents( + Ignite node, + final Map>> rcvdMsgs, + final List> expMsgs) throws Exception { + final UUID nodeId = node.cluster().localNode().id(); + + assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + List> msgs = rcvdMsgs.get(nodeId); + + int size = msgs == null ? 0 : msgs.size(); + + return size >= expMsgs.size(); + } + }, 5000)); + + List> msgs = rcvdMsgs.get(nodeId); + + if (msgs == null) + msgs = Collections.emptyList(); + + assertEqualsCollections(expMsgs, msgs); + } + + /** + * @param node Node. + * @param rcvdMsgs Map to store received events. + */ + private void registerTestEventListeners(Ignite node, + final Map>> rcvdMsgs) { + GridDiscoveryManager discoveryMgr = ((IgniteKernal)node).context().discovery(); + + final UUID nodeId = node.cluster().localNode().id(); + + discoveryMgr.setCustomEventListener(TestFastStopProcessCustomMessage.class, + new CustomEventListener() { + @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, TestFastStopProcessCustomMessage msg) { + List> list = rcvdMsgs.get(nodeId); + + if (list == null) + rcvdMsgs.put(nodeId, list = new ArrayList<>()); + + list.add(new T3<>(topVer, snd.id(), (DiscoveryCustomMessage)msg)); + } + } + ); + discoveryMgr.setCustomEventListener(TestFastStopProcessCustomMessageAck.class, + new CustomEventListener() { + @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, TestFastStopProcessCustomMessageAck msg) { + List> list = rcvdMsgs.get(nodeId); + + if (list == null) + rcvdMsgs.put(nodeId, list = new ArrayList<>()); + + list.add(new T3<>(topVer, snd.id(), (DiscoveryCustomMessage)msg)); + } + } + ); + } + + /** + * @throws Exception If failed. + */ + public void testSegmentation1() throws Exception { + sesTimeout = 2000; + testSockNio = true; + + Ignite node0 = startGrid(0); + + final CountDownLatch l = new CountDownLatch(1); + + node0.events().localListen(new IgnitePredicate() { + @Override public boolean apply(Event evt) { + l.countDown(); + + return false; + } + }, EventType.EVT_NODE_SEGMENTED); + + ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0); + + c0.closeSocket(true); + + for (int i = 0; i < 10; i++) { + Thread.sleep(1_000); + + if (l.getCount() == 0) + break; + } + + info("Allow connect"); + + c0.allowConnect(); + + assertTrue(l.await(10, TimeUnit.SECONDS)); + } + + /** + * @throws Exception If failed. + */ + public void testSegmentation2() throws Exception { + sesTimeout = 2000; + + Ignite node0 = startGrid(0); + + final CountDownLatch l = new CountDownLatch(1); + + node0.events().localListen(new IgnitePredicate() { + @Override public boolean apply(Event evt) { + l.countDown(); + + return false; + } + }, EventType.EVT_NODE_SEGMENTED); + + try { + zkCluster.close(); + + assertTrue(l.await(10, TimeUnit.SECONDS)); + } + finally { + zkCluster = ZookeeperDiscoverySpiTestSuite2.createTestingCluster(ZK_SRVS); + + zkCluster.start(); + } + } + + /** + * @throws Exception If failed. + */ + public void testSegmentation3() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-8183"); + + sesTimeout = 5000; + + Ignite node0 = startGrid(0); + + final CountDownLatch l = new CountDownLatch(1); + + node0.events().localListen(new IgnitePredicate() { + @Override public boolean apply(Event evt) { + l.countDown(); + + return false; + } + }, EventType.EVT_NODE_SEGMENTED); + + List srvs = zkCluster.getServers(); + + assertEquals(3, srvs.size()); + + try { + srvs.get(0).stop(); + srvs.get(1).stop(); + + assertTrue(l.await(20, TimeUnit.SECONDS)); + } + finally { + zkCluster.close(); + + zkCluster = ZookeeperDiscoverySpiTestSuite2.createTestingCluster(ZK_SRVS); + + zkCluster.start(); + } + } + + /** + * @throws Exception If failed. + */ + public void testQuorumRestore() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-8180"); + + sesTimeout = 15_000; + + startGrids(3); + + waitForTopology(3); + + List srvs = zkCluster.getServers(); + + assertEquals(3, srvs.size()); + + try { + srvs.get(0).stop(); + srvs.get(1).stop(); + + U.sleep(2000); + + srvs.get(1).restart(); + + U.sleep(4000); + + startGrid(4); + + waitForTopology(4); + } + finally { + zkCluster.close(); + + zkCluster = ZookeeperDiscoverySpiTestSuite2.createTestingCluster(ZK_SRVS); + + zkCluster.start(); + } + } + + /** + * @throws Exception If failed. + */ + public void testConnectionRestore1() throws Exception { + testSockNio = true; + + Ignite node0 = startGrid(0); + + ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0); + + c0.closeSocket(false); + + startGrid(1); + } + + /** + * @throws Exception If failed. + */ + public void testConnectionRestore2() throws Exception { + testSockNio = true; + + Ignite node0 = startGrid(0); + + ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0); + + c0.closeSocket(false); + + startGridsMultiThreaded(1, 5); + } + + /** + * @throws Exception If failed. + */ + public void testConnectionRestore_NonCoordinator1() throws Exception { + connectionRestore_NonCoordinator(false); + } + + /** + * @throws Exception If failed. + */ + public void testConnectionRestore_NonCoordinator2() throws Exception { + connectionRestore_NonCoordinator(true); + } + + /** + * @param failWhenDisconnected {@code True} if fail node while another node is disconnected. + * @throws Exception If failed. + */ + private void connectionRestore_NonCoordinator(boolean failWhenDisconnected) throws Exception { + testSockNio = true; + + Ignite node0 = startGrid(0); + Ignite node1 = startGrid(1); + + ZkTestClientCnxnSocketNIO c1 = ZkTestClientCnxnSocketNIO.forNode(node1); + + c1.closeSocket(true); + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Void call() { + try { + startGrid(2); + } + catch (Exception e) { + info("Start error: " + e); + } + + return null; + } + }, "start-node"); + + checkEvents(node0, joinEvent(3)); + + if (failWhenDisconnected) { + ZookeeperDiscoverySpi spi = spis.get(getTestIgniteInstanceName(2)); + + closeZkClient(spi); + + checkEvents(node0, failEvent(4)); + } + + c1.allowConnect(); + + checkEvents(ignite(1), joinEvent(3)); + + if (failWhenDisconnected) { + checkEvents(ignite(1), failEvent(4)); + + IgnitionEx.stop(getTestIgniteInstanceName(2), true, true); + } + + fut.get(); + + waitForTopology(failWhenDisconnected ? 2 : 3); + } + + /** + * @throws Exception If failed. + */ + public void testConnectionRestore_Coordinator1() throws Exception { + connectionRestore_Coordinator(1, 1, 0); + } + + /** + * @throws Exception If failed. + */ + public void testConnectionRestore_Coordinator1_1() throws Exception { + connectionRestore_Coordinator(1, 1, 1); + } + + /** + * @throws Exception If failed. + */ + public void testConnectionRestore_Coordinator2() throws Exception { + connectionRestore_Coordinator(1, 3, 0); + } + + /** + * @throws Exception If failed. + */ + public void testConnectionRestore_Coordinator3() throws Exception { + connectionRestore_Coordinator(3, 3, 0); + } + + /** + * @throws Exception If failed. + */ + public void testConnectionRestore_Coordinator4() throws Exception { + connectionRestore_Coordinator(3, 3, 1); + } + + /** + * @param initNodes Number of initially started nodes. + * @param startNodes Number of nodes to start after coordinator loose connection. + * @param failCnt Number of nodes to stop after coordinator loose connection. + * @throws Exception If failed. + */ + private void connectionRestore_Coordinator(final int initNodes, int startNodes, int failCnt) throws Exception { + sesTimeout = 30_000; + testSockNio = true; + + Ignite node0 = startGrids(initNodes); + + ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0); + + c0.closeSocket(true); + + final AtomicInteger nodeIdx = new AtomicInteger(initNodes); + + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable() { + @Override public Void call() { + try { + startGrid(nodeIdx.getAndIncrement()); + } + catch (Exception e) { + error("Start failed: " + e); + } + + return null; + } + }, startNodes, "start-node"); + + int cnt = 0; + + DiscoveryEvent[] expEvts = new DiscoveryEvent[startNodes - failCnt]; + + int expEvtCnt = 0; + + sesTimeout = 1000; + + List blockedC = new ArrayList<>(); + + final List failedZkNodes = new ArrayList<>(failCnt); + + for (int i = initNodes; i < initNodes + startNodes; i++) { + final ZookeeperDiscoverySpi spi = waitSpi(getTestIgniteInstanceName(i)); + + assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + Object spiImpl = GridTestUtils.getFieldValue(spi, "impl"); + + if (spiImpl == null) + return false; + + long internalOrder = GridTestUtils.getFieldValue(spiImpl, "rtState", "internalOrder"); + + return internalOrder > 0; + } + }, 10_000)); + + if (cnt++ < failCnt) { + ZkTestClientCnxnSocketNIO c = ZkTestClientCnxnSocketNIO.forNode(getTestIgniteInstanceName(i)); + + c.closeSocket(true); + + blockedC.add(c); + + failedZkNodes.add(aliveZkNodePath(spi)); + } + else { + expEvts[expEvtCnt] = joinEvent(initNodes + expEvtCnt + 1); + + expEvtCnt++; + } + } + + waitNoAliveZkNodes(log, zkCluster.getConnectString(), failedZkNodes, 30_000); + + c0.allowConnect(); + + for (ZkTestClientCnxnSocketNIO c : blockedC) + c.allowConnect(); + + if (expEvts.length > 0) { + for (int i = 0; i < initNodes; i++) + checkEvents(ignite(i), expEvts); + } + + fut.get(); + + waitForTopology(initNodes + startNodes - failCnt); + } + + /** + * @param node Node. + * @return Corresponding znode. + */ + private static String aliveZkNodePath(Ignite node) { + return aliveZkNodePath(node.configuration().getDiscoverySpi()); + } + + /** + * @param spi SPI. + * @return Znode related to given SPI. + */ + private static String aliveZkNodePath(DiscoverySpi spi) { + String path = GridTestUtils.getFieldValue(spi, "impl", "rtState", "locNodeZkPath"); + + return path.substring(path.lastIndexOf('/') + 1); + } + + /** + * @param log Logger. + * @param connectString Zookeeper connect string. + * @param failedZkNodes Znodes which should be removed. + * @param timeout Timeout. + * @throws Exception If failed. + */ + private static void waitNoAliveZkNodes(final IgniteLogger log, + String connectString, + final List failedZkNodes, + long timeout) + throws Exception + { + final ZookeeperClient zkClient = new ZookeeperClient(log, connectString, 10_000, null); + + try { + assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + try { + List c = zkClient.getChildren(IGNITE_ZK_ROOT + "/" + ZkIgnitePaths.ALIVE_NODES_DIR); + + for (String failedZkNode : failedZkNodes) { + if (c.contains(failedZkNode)) { + log.info("Alive node is not removed [node=" + failedZkNode + ", all=" + c + ']'); + + return false; + } + } + + return true; + } + catch (Exception e) { + e.printStackTrace(); + + fail(); + + return true; + } + } + }, timeout)); + } + finally { + zkClient.close(); + } + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentStartWithClient() throws Exception { + final int NODES = 20; + + for (int i = 0; i < 3; i++) { + info("Iteration: " + i); + + final int srvIdx = ThreadLocalRandom.current().nextInt(NODES); + + final AtomicInteger idx = new AtomicInteger(); + + GridTestUtils.runMultiThreaded(new Callable() { + @Override public Void call() throws Exception { + int threadIdx = idx.getAndIncrement(); + + clientModeThreadLocal(threadIdx == srvIdx || ThreadLocalRandom.current().nextBoolean()); + + startGrid(threadIdx); + + return null; + } + }, NODES, "start-node"); + + waitForTopology(NODES); + + stopAllGrids(); + + checkEventsConsistency(); + + evts.clear(); + } + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentStart() throws Exception { + final int NODES = 20; + + for (int i = 0; i < 3; i++) { + info("Iteration: " + i); + + final AtomicInteger idx = new AtomicInteger(); + + final CyclicBarrier b = new CyclicBarrier(NODES); + + GridTestUtils.runMultiThreaded(new Callable() { + @Override public Void call() throws Exception { + b.await(); + + int threadIdx = idx.getAndIncrement(); + + startGrid(threadIdx); + + return null; + } + }, NODES, "start-node"); + + waitForTopology(NODES); + + stopAllGrids(); + + checkEventsConsistency(); + + evts.clear(); + } + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentStartStop1() throws Exception { + concurrentStartStop(1); + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentStartStop2() throws Exception { + concurrentStartStop(5); + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentStartStop2_EventsThrottle() throws Exception { + System.setProperty(ZookeeperDiscoveryImpl.IGNITE_ZOOKEEPER_DISCOVERY_SPI_MAX_EVTS, "1"); + + try { + concurrentStartStop(5); + } + finally { + System.clearProperty(ZookeeperDiscoveryImpl.IGNITE_ZOOKEEPER_DISCOVERY_SPI_MAX_EVTS); + } + } + + /** + * @param initNodes Number of initially started nnodes. + * @throws Exception If failed. + */ + private void concurrentStartStop(final int initNodes) throws Exception { + startGrids(initNodes); + + final int NODES = 5; + + long topVer = initNodes; + + for (int i = 0; i < 10; i++) { + info("Iteration: " + i); + + DiscoveryEvent[] expEvts = new DiscoveryEvent[NODES]; + + startGridsMultiThreaded(initNodes, NODES); + + for (int j = 0; j < NODES; j++) + expEvts[j] = joinEvent(++topVer); + + checkEvents(ignite(0), expEvts); + + checkEventsConsistency(); + + final CyclicBarrier b = new CyclicBarrier(NODES); + + GridTestUtils.runMultiThreaded(new IgniteInClosure() { + @Override public void apply(Integer idx) { + try { + b.await(); + + stopGrid(initNodes + idx); + } + catch (Exception e) { + e.printStackTrace(); + + fail(); + } + } + }, NODES, "stop-node"); + + for (int j = 0; j < NODES; j++) + expEvts[j] = failEvent(++topVer); + + checkEventsConsistency(); + } + } + + /** + * @throws Exception If failed. + */ + public void testClusterRestart() throws Exception { + startGridsMultiThreaded(3, false); + + stopAllGrids(); + + evts.clear(); + + startGridsMultiThreaded(3, false); + + waitForTopology(3); + } + + /** + * @throws Exception If failed. + */ + public void testConnectionRestore4() throws Exception { + testSockNio = true; + + Ignite node0 = startGrid(0); + + ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0); + + c0.closeSocket(false); + + startGrid(1); + } + + /** + * @throws Exception If failed. + */ + public void testStartStop_1_Node() throws Exception { + startGrid(0); + + waitForTopology(1); + + stopGrid(0); + } + + /** + * @throws Exception If failed. + */ + public void testRestarts_2_Nodes() throws Exception { + startGrid(0); + + for (int i = 0; i < 10; i++) { + info("Iteration: " + i); + + startGrid(1); + + waitForTopology(2); + + stopGrid(1); + } + } + + /** + * @throws Exception If failed. + */ + public void testStartStop_2_Nodes_WithCache() throws Exception { + startGrids(2); + + for (Ignite node : G.allGrids()) { + IgniteCache cache = node.cache(DEFAULT_CACHE_NAME); + + assertNotNull(cache); + + for (int i = 0; i < 100; i++) { + cache.put(i, node.name()); + + assertEquals(node.name(), cache.get(i)); + } + } + + awaitPartitionMapExchange(); + } + + /** + * @throws Exception If failed. + */ + public void testStartStop_2_Nodes() throws Exception { + ackEveryEventSystemProperty(); + + startGrid(0); + + waitForTopology(1); + + startGrid(1); + + waitForTopology(2); + + for (Ignite node : G.allGrids()) + node.compute().broadcast(new DummyCallable(null)); + + awaitPartitionMapExchange(); + + waitForEventsAcks(ignite(0)); + } + + /** + * @throws Exception If failed. + */ + public void testMultipleClusters() throws Exception { + Ignite c0 = startGrid(0); + + zkRootPath = "/cluster2"; + + Ignite c1 = startGridsMultiThreaded(1, 5); + + zkRootPath = "/cluster3"; + + Ignite c2 = startGridsMultiThreaded(6, 3); + + checkNodesNumber(c0, 1); + checkNodesNumber(c1, 5); + checkNodesNumber(c2, 3); + + stopGrid(2); + + checkNodesNumber(c0, 1); + checkNodesNumber(c1, 4); + checkNodesNumber(c2, 3); + + for (int i = 0; i < 3; i++) + stopGrid(i + 6); + + checkNodesNumber(c0, 1); + checkNodesNumber(c1, 4); + + c2 = startGridsMultiThreaded(6, 2); + + checkNodesNumber(c0, 1); + checkNodesNumber(c1, 4); + checkNodesNumber(c2, 2); + + evts.clear(); + } + + /** + * @param node Node. + * @param expNodes Expected node in cluster. + * @throws Exception If failed. + */ + private void checkNodesNumber(final Ignite node, final int expNodes) throws Exception { + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return node.cluster().nodes().size() == expNodes; + } + }, 5000); + + assertEquals(expNodes, node.cluster().nodes().size()); + } + + /** + * @throws Exception If failed. + */ + public void testStartStop1() throws Exception { + ackEveryEventSystemProperty(); + + startGridsMultiThreaded(5, false); + + waitForTopology(5); + + awaitPartitionMapExchange(); + + waitForEventsAcks(ignite(0)); + + stopGrid(0); + + waitForTopology(4); + + for (Ignite node : G.allGrids()) + node.compute().broadcast(new DummyCallable(null)); + + startGrid(0); + + waitForTopology(5); + + awaitPartitionMapExchange(); + + waitForEventsAcks(grid(CU.oldest(ignite(1).cluster().nodes()))); + } + + /** + * @throws Exception If failed. + */ + public void testStartStop3() throws Exception { + startGrids(4); + + awaitPartitionMapExchange(); + + stopGrid(0); + + startGrid(5); + + awaitPartitionMapExchange(); + } + + /** + * @throws Exception If failed. + */ + public void testStartStop4() throws Exception { + startGrids(6); + + awaitPartitionMapExchange(); + + stopGrid(2); + + if (ThreadLocalRandom.current().nextBoolean()) + awaitPartitionMapExchange(); + + stopGrid(1); + + if (ThreadLocalRandom.current().nextBoolean()) + awaitPartitionMapExchange(); + + stopGrid(0); + + if (ThreadLocalRandom.current().nextBoolean()) + awaitPartitionMapExchange(); + + startGrid(7); + + awaitPartitionMapExchange(); + } + + /** + * @throws Exception If failed. + */ + public void testStartStop2() throws Exception { + startGridsMultiThreaded(10, false); + + GridTestUtils.runMultiThreaded(new IgniteInClosure() { + @Override public void apply(Integer idx) { + stopGrid(idx); + } + }, 3, "stop-node-thread"); + + waitForTopology(7); + + startGridsMultiThreaded(0, 3); + + waitForTopology(10); + } + + /** + * @throws Exception If failed. + */ + public void testStartStopWithClients() throws Exception { + final int SRVS = 3; + + startGrids(SRVS); + + clientMode(true); + + final int THREADS = 30; + + for (int i = 0; i < 5; i++) { + info("Iteration: " + i); + + startGridsMultiThreaded(SRVS, THREADS); + + waitForTopology(SRVS + THREADS); + + GridTestUtils.runMultiThreaded(new IgniteInClosure() { + @Override public void apply(Integer idx) { + stopGrid(idx + SRVS); + } + }, THREADS, "stop-node"); + + waitForTopology(SRVS); + + checkEventsConsistency(); + } + } + + /** + * @throws Exception If failed. + */ + public void testTopologyChangeMultithreaded() throws Exception { + topologyChangeWithRestarts(false, false); + } + + /** + * @throws Exception If failed. + */ + public void testTopologyChangeMultithreaded_RestartZk() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-8184"); + + try { + topologyChangeWithRestarts(true, false); + } + finally { + zkCluster.stop(); + + zkCluster = null; + } + } + + /** + * @throws Exception If failed. + */ + public void testTopologyChangeMultithreaded_RestartZk_CloseClients() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-8184"); + + try { + topologyChangeWithRestarts(true, true); + } + finally { + zkCluster.stop(); + + zkCluster = null; + } + } + + /** + * @param restartZk If {@code true} in background restarts on of ZK servers. + * @param closeClientSock If {@code true} in background closes zk clients' sockets. + * @throws Exception If failed. + */ + private void topologyChangeWithRestarts(boolean restartZk, boolean closeClientSock) throws Exception { + sesTimeout = 30_000; + + if (closeClientSock) + testSockNio = true; + + long stopTime = System.currentTimeMillis() + 60_000; + + AtomicBoolean stop = new AtomicBoolean(); + + IgniteInternalFuture fut1 = null; + + IgniteInternalFuture fut2 = null; + + try { + fut1 = restartZk ? startRestartZkServers(stopTime, stop) : null; + fut2 = closeClientSock ? startCloseZkClientSocket(stopTime, stop) : null; + + int INIT_NODES = 10; + + startGridsMultiThreaded(INIT_NODES); + + final int MAX_NODES = 20; + + final List startedNodes = new ArrayList<>(); + + for (int i = 0; i < INIT_NODES; i++) + startedNodes.add(i); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + final AtomicInteger startIdx = new AtomicInteger(INIT_NODES); + + while (System.currentTimeMillis() < stopTime) { + if (startedNodes.size() >= MAX_NODES) { + int stopNodes = rnd.nextInt(5) + 1; + + log.info("Next, stop nodes: " + stopNodes); + + final List idxs = new ArrayList<>(); + + while (idxs.size() < stopNodes) { + Integer stopIdx = rnd.nextInt(startedNodes.size()); + + if (!idxs.contains(stopIdx)) + idxs.add(startedNodes.get(stopIdx)); + } + + GridTestUtils.runMultiThreaded(new IgniteInClosure() { + @Override public void apply(Integer threadIdx) { + int stopNodeIdx = idxs.get(threadIdx); + + info("Stop node: " + stopNodeIdx); + + stopGrid(stopNodeIdx); + } + }, stopNodes, "stop-node"); + + startedNodes.removeAll(idxs); + } + else { + int startNodes = rnd.nextInt(5) + 1; + + log.info("Next, start nodes: " + startNodes); + + GridTestUtils.runMultiThreaded(new Callable() { + @Override public Void call() throws Exception { + int idx = startIdx.incrementAndGet(); + + log.info("Start node: " + idx); + + startGrid(idx); + + synchronized (startedNodes) { + startedNodes.add(idx); + } + + return null; + } + }, startNodes, "start-node"); + } + + U.sleep(rnd.nextInt(100) + 1); + } + } + finally { + stop.set(true); + } + + if (fut1 != null) + fut1.get(); + + if (fut2 != null) + fut2.get(); + } + + /** + * @throws Exception If failed. + */ + public void testRandomTopologyChanges() throws Exception { + randomTopologyChanges(false, false); + } + + /** + * @throws Exception If failed. + */ + private void checkZkNodesCleanup() throws Exception { + final ZookeeperClient zkClient = new ZookeeperClient(getTestResources().getLogger(), + zkCluster.getConnectString(), + 30_000, + null); + + final String basePath = IGNITE_ZK_ROOT + "/"; + + final String aliveDir = basePath + ZkIgnitePaths.ALIVE_NODES_DIR + "/"; + + try { + List znodes = listSubTree(zkClient.zk(), IGNITE_ZK_ROOT); + + boolean foundAlive = false; + + for (String znode : znodes) { + if (znode.startsWith(aliveDir)) { + foundAlive = true; + + break; + } + } + + assertTrue(foundAlive); // Sanity check to make sure we check correct directory. + + assertTrue("Failed to wait for unused znodes cleanup", GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + try { + List znodes = listSubTree(zkClient.zk(), IGNITE_ZK_ROOT); + + for (String znode : znodes) { + if (znode.startsWith(aliveDir) || znode.length() < basePath.length()) + continue; + + znode = znode.substring(basePath.length()); + + if (!znode.contains("/")) // Ignore roots. + continue; + + // TODO ZK: https://issues.apache.org/jira/browse/IGNITE-8193 + if (znode.startsWith("jd/")) + continue; + + log.info("Found unexpected znode: " + znode); + + return false; + } + + return true; + } + catch (Exception e) { + error("Unexpected error: " + e, e); + + fail("Unexpected error: " + e); + } + + return false; + } + }, 10_000)); + } + finally { + zkClient.close(); + } + } + + /** + * @throws Exception If failed. + */ + public void testRandomTopologyChanges_RestartZk() throws Exception { + randomTopologyChanges(true, false); + } + + /** + * @throws Exception If failed. + */ + public void testRandomTopologyChanges_CloseClients() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-8182"); + + randomTopologyChanges(false, true); + } + + /** + * @throws Exception If failed. + */ + public void testDeployService1() throws Exception { + startGridsMultiThreaded(3); + + grid(0).services(grid(0).cluster()).deployNodeSingleton("test", new GridCacheAbstractFullApiSelfTest.DummyServiceImpl()); + } + + /** + * @throws Exception If failed. + */ + public void testDeployService2() throws Exception { + clientMode(false); + + startGrid(0); + + clientMode(true); + + startGrid(1); + + grid(0).services(grid(0).cluster()).deployNodeSingleton("test", new GridCacheAbstractFullApiSelfTest.DummyServiceImpl()); + } + + /** + * @throws Exception If failed. + */ + public void testDeployService3() throws Exception { + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + clientModeThreadLocal(true); + + startGrid(0); + + return null; + } + }, "start-node"); + + clientModeThreadLocal(false); + + startGrid(1); + + fut.get(); + + grid(0).services(grid(0).cluster()).deployNodeSingleton("test", new GridCacheAbstractFullApiSelfTest.DummyServiceImpl()); + } + + /** + * @throws Exception If failed. + */ + public void testLargeUserAttribute1() throws Exception { + initLargeAttribute(); + + startGrid(0); + + checkZkNodesCleanup(); + + userAttrs = null; + + startGrid(1); + + waitForEventsAcks(ignite(0)); + + waitForTopology(2); + } + + /** + * @throws Exception If failed. + */ + public void testLargeUserAttribute2() throws Exception { + startGrid(0); + + initLargeAttribute(); + + startGrid(1); + + waitForEventsAcks(ignite(0)); + + checkZkNodesCleanup(); + } + + /** + * @throws Exception If failed. + */ + public void testLargeUserAttribute3() throws Exception { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + long stopTime = System.currentTimeMillis() + 60_000; + + int nodes = 0; + + for (int i = 0; i < 25; i++) { + info("Iteration: " + i); + + if (rnd.nextBoolean()) + initLargeAttribute(); + else + userAttrs = null; + + clientMode(i > 5); + + startGrid(i); + + nodes++; + + if (System.currentTimeMillis() >= stopTime) + break; + } + + waitForTopology(nodes); + } + + /** + * + */ + private void initLargeAttribute() { + userAttrs = new HashMap<>(); + + int[] attr = new int[1024 * 1024 + ThreadLocalRandom.current().nextInt(1024)]; + + for (int i = 0; i < attr.length; i++) + attr[i] = i; + + userAttrs.put("testAttr", attr); + } + + /** + * @throws Exception If failed. + */ + public void testLargeCustomEvent() throws Exception { + Ignite srv0 = startGrid(0); + + // Send large message, single node in topology. + IgniteCache cache = srv0.createCache(largeCacheConfiguration("c1")); + + for (int i = 0; i < 100; i++) + cache.put(i, i); + + assertEquals(1, cache.get(1)); + + waitForEventsAcks(ignite(0)); + + startGridsMultiThreaded(1, 3); + + srv0.destroyCache("c1"); + + // Send large message, multiple nodes in topology. + cache = srv0.createCache(largeCacheConfiguration("c1")); + + for (int i = 0; i < 100; i++) + cache.put(i, i); + + waitForTopology(4); + + ignite(3).createCache(largeCacheConfiguration("c2")); + } + + /** + * @throws Exception If failed. + */ + public void testClientReconnectSessionExpire1_1() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-8131"); + + clientReconnectSessionExpire(false); + } + + /** + * @throws Exception If failed. + */ + public void testClientReconnectSessionExpire1_2() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-8131"); + + clientReconnectSessionExpire(true); + } + + /** + * @param closeSock Test mode flag. + * @throws Exception If failed. + */ + private void clientReconnectSessionExpire(boolean closeSock) throws Exception { + startGrid(0); + + sesTimeout = 2000; + clientMode(true); + testSockNio = true; + + Ignite client = startGrid(1); + + client.cache(DEFAULT_CACHE_NAME).put(1, 1); + + reconnectClientNodes(log, Collections.singletonList(client), closeSock); + + assertEquals(1, client.cache(DEFAULT_CACHE_NAME).get(1)); + + client.compute().broadcast(new DummyCallable(null)); + } + + /** + * @throws Exception If failed. + */ + public void testForceClientReconnect() throws Exception { + final int SRVS = 3; + + startGrids(SRVS); + + clientMode(true); + + startGrid(SRVS); + + reconnectClientNodes(Collections.singletonList(ignite(SRVS)), new Callable() { + @Override public Void call() throws Exception { + ZookeeperDiscoverySpi spi = waitSpi(getTestIgniteInstanceName(SRVS)); + + spi.clientReconnect(); + + return null; + } + }); + + waitForTopology(SRVS + 1); + } + + /** + * @throws Exception If failed. + */ + public void testForcibleClientFail() throws Exception { + final int SRVS = 3; + + startGrids(SRVS); + + clientMode(true); + + startGrid(SRVS); + + reconnectClientNodes(Collections.singletonList(ignite(SRVS)), new Callable() { + @Override public Void call() throws Exception { + ZookeeperDiscoverySpi spi = waitSpi(getTestIgniteInstanceName(0)); + + spi.failNode(ignite(SRVS).cluster().localNode().id(), "Test forcible node fail"); + + return null; + } + }); + + waitForTopology(SRVS + 1); + } + + /** + * @throws Exception If failed. + */ + public void testDuplicatedNodeId() throws Exception { + UUID nodeId0 = nodeId = UUID.randomUUID(); + + startGrid(0); + + int failingNodeIdx = 100; + + for (int i = 0; i < 5; i++) { + final int idx = failingNodeIdx++; + + nodeId = nodeId0; + + info("Start node with duplicated ID [iter=" + i + ", nodeId=" + nodeId + ']'); + + Throwable err = GridTestUtils.assertThrows(log, new Callable() { + @Override public Void call() throws Exception { + startGrid(idx); + + return null; + } + }, IgniteCheckedException.class, null); + + IgniteSpiException spiErr = X.cause(err, IgniteSpiException.class); + + assertNotNull(spiErr); + assertTrue(spiErr.getMessage().contains("Node with the same ID already exists")); + + nodeId = null; + + info("Start node with unique ID [iter=" + i + ']'); + + Ignite ignite = startGrid(idx); + + nodeId0 = ignite.cluster().localNode().id(); + + waitForTopology(i + 2); + } + } + + /** + * @throws Exception If failed. + */ + public void testPing() throws Exception { + sesTimeout = 5000; + + startGrids(3); + + final ZookeeperDiscoverySpi spi = waitSpi(getTestIgniteInstanceName(1)); + + final UUID nodeId = ignite(2).cluster().localNode().id(); + + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + assertTrue(spi.pingNode(nodeId)); + } + }, 32, "ping"); + + fut.get(); + + fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + spi.pingNode(nodeId); + } + }, 32, "ping"); + + U.sleep(100); + + stopGrid(2); + + fut.get(); + + fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + assertFalse(spi.pingNode(nodeId)); + } + }, 32, "ping"); + + fut.get(); + } + + /** + * @throws Exception If failed. + */ + public void testWithPersistence1() throws Exception { + startWithPersistence(false); + } + + /** + * @throws Exception If failed. + */ + public void testWithPersistence2() throws Exception { + startWithPersistence(true); + } + + /** + * @throws Exception If failed. + */ + public void testNoOpCommunicationFailureResolve_1() throws Exception { + communicationFailureResolve_Simple(2); + } + + /** + * @throws Exception If failed. + */ + public void testNoOpCommunicationErrorResolve_2() throws Exception { + communicationFailureResolve_Simple(10); + } + + /** + * @param nodes Nodes number. + * @throws Exception If failed. + */ + private void communicationFailureResolve_Simple(int nodes) throws Exception { + assert nodes > 1; + + sesTimeout = 2000; + commFailureRslvr = NoOpCommunicationFailureResolver.FACTORY; + + startGridsMultiThreaded(nodes); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (int i = 0; i < 3; i++) { + info("Iteration: " + i); + + int idx1 = rnd.nextInt(nodes); + + int idx2; + + do { + idx2 = rnd.nextInt(nodes); + } + while (idx1 == idx2); + + ZookeeperDiscoverySpi spi = spi(ignite(idx1)); + + spi.resolveCommunicationFailure(ignite(idx2).cluster().localNode(), new Exception("test")); + + checkInternalStructuresCleanup(); + } + } + + /** + * Tests case when one node fails before sending communication status. + * + * @throws Exception If failed. + */ + public void testNoOpCommunicationErrorResolve_3() throws Exception { + sesTimeout = 2000; + commFailureRslvr = NoOpCommunicationFailureResolver.FACTORY; + + startGridsMultiThreaded(3); + + sesTimeout = 10_000; + + testSockNio = true; + sesTimeout = 5000; + + startGrid(3); + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Object call() { + ZookeeperDiscoverySpi spi = spi(ignite(0)); + + spi.resolveCommunicationFailure(ignite(1).cluster().localNode(), new Exception("test")); + + return null; + } + }); + + U.sleep(1000); + + ZkTestClientCnxnSocketNIO nio = ZkTestClientCnxnSocketNIO.forNode(ignite(3)); + + nio.closeSocket(true); + + try { + stopGrid(3); + + fut.get(); + } + finally { + nio.allowConnect(); + } + + waitForTopology(3); + } + + /** + * Tests case when Coordinator fails while resolve process is in progress. + * + * @throws Exception If failed. + */ + public void testNoOpCommunicationErrorResolve_4() throws Exception { + testCommSpi = true; + + sesTimeout = 2000; + commFailureRslvr = NoOpCommunicationFailureResolver.FACTORY; + + startGrid(0); + + startGridsMultiThreaded(1, 3); + + ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.testSpi(ignite(3)); + + commSpi.pingLatch = new CountDownLatch(1); + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Object call() { + ZookeeperDiscoverySpi spi = spi(ignite(1)); + + spi.resolveCommunicationFailure(ignite(2).cluster().localNode(), new Exception("test")); + + return null; + } + }); + + U.sleep(1000); + + assertFalse(fut.isDone()); + + stopGrid(0); + + commSpi.pingLatch.countDown(); + + fut.get(); + + waitForTopology(3); + } + + /** + * Tests that nodes join is delayed while resolve is in progress. + * + * @throws Exception If failed. + */ + public void testNoOpCommunicationErrorResolve_5() throws Exception { + testCommSpi = true; + + sesTimeout = 2000; + commFailureRslvr = NoOpCommunicationFailureResolver.FACTORY; + + startGrid(0); + + startGridsMultiThreaded(1, 3); + + ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.testSpi(ignite(3)); + + commSpi.pingStartLatch = new CountDownLatch(1); + commSpi.pingLatch = new CountDownLatch(1); + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Object call() { + ZookeeperDiscoverySpi spi = spi(ignite(1)); + + spi.resolveCommunicationFailure(ignite(2).cluster().localNode(), new Exception("test")); + + return null; + } + }); + + assertTrue(commSpi.pingStartLatch.await(10, SECONDS)); + + try { + assertFalse(fut.isDone()); + + final AtomicInteger nodeIdx = new AtomicInteger(3); + + IgniteInternalFuture startFut = GridTestUtils.runMultiThreadedAsync(new Callable() { + @Override public Void call() throws Exception { + startGrid(nodeIdx.incrementAndGet()); + + return null; + } + }, 3, "start-node"); + + U.sleep(1000); + + assertFalse(startFut.isDone()); + + assertEquals(4, ignite(0).cluster().nodes().size()); + + commSpi.pingLatch.countDown(); + + startFut.get(); + fut.get(); + + waitForTopology(7); + } + finally { + commSpi.pingLatch.countDown(); + } + } + + /** + * @throws Exception If failed. + */ + public void testCommunicationErrorResolve_KillNode_1() throws Exception { + communicationFailureResolve_KillNodes(2, Collections.singleton(2L)); + } + + /** + * @throws Exception If failed. + */ + public void testCommunicationErrorResolve_KillNode_2() throws Exception { + communicationFailureResolve_KillNodes(3, Collections.singleton(2L)); + } + + /** + * @throws Exception If failed. + */ + public void testCommunicationErrorResolve_KillNode_3() throws Exception { + communicationFailureResolve_KillNodes(10, Arrays.asList(2L, 4L, 6L)); + } + + /** + * @throws Exception If failed. + */ + public void testCommunicationErrorResolve_KillCoordinator_1() throws Exception { + communicationFailureResolve_KillNodes(2, Collections.singleton(1L)); + } + + /** + * @throws Exception If failed. + */ + public void testCommunicationErrorResolve_KillCoordinator_2() throws Exception { + communicationFailureResolve_KillNodes(3, Collections.singleton(1L)); + } + + /** + * @throws Exception If failed. + */ + public void testCommunicationErrorResolve_KillCoordinator_3() throws Exception { + communicationFailureResolve_KillNodes(10, Arrays.asList(1L, 4L, 6L)); + } + + /** + * @throws Exception If failed. + */ + public void testCommunicationErrorResolve_KillCoordinator_4() throws Exception { + communicationFailureResolve_KillNodes(10, Arrays.asList(1L, 2L, 3L)); + } + + /** + * @param startNodes Number of nodes to start. + * @param killNodes Nodes to kill by resolve process. + * @throws Exception If failed. + */ + private void communicationFailureResolve_KillNodes(int startNodes, Collection killNodes) throws Exception { + testCommSpi = true; + + commFailureRslvr = TestNodeKillCommunicationFailureResolver.factory(killNodes); + + startGrids(startNodes); + + ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.testSpi(ignite(0)); + + commSpi.checkRes = new BitSet(startNodes); + + ZookeeperDiscoverySpi spi = null; + UUID killNodeId = null; + + for (Ignite node : G.allGrids()) { + ZookeeperDiscoverySpi spi0 = spi(node); + + if (!killNodes.contains(node.cluster().localNode().order())) + spi = spi0; + else + killNodeId = node.cluster().localNode().id(); + } + + assertNotNull(spi); + assertNotNull(killNodeId); + + try { + spi.resolveCommunicationFailure(spi.getNode(killNodeId), new Exception("test")); + + fail("Exception is not thrown"); + } + catch (IgniteSpiException e) { + assertTrue("Unexpected exception: " + e, e.getCause() instanceof ClusterTopologyCheckedException); + } + + int expNodes = startNodes - killNodes.size(); + + waitForTopology(expNodes); + + for (Ignite node : G.allGrids()) + assertFalse(killNodes.contains(node.cluster().localNode().order())); + + startGrid(startNodes); + + waitForTopology(expNodes + 1); + } + + /** + * @throws Exception If failed. + */ + public void testCommunicationFailureResolve_KillCoordinator_5() throws Exception { + sesTimeout = 2000; + + testCommSpi = true; + commFailureRslvr = KillCoordinatorCommunicationFailureResolver.FACTORY; + + startGrids(10); + + int crd = 0; + + int nodeIdx = 10; + + for (int i = 0; i < 10; i++) { + info("Iteration: " + i); + + for (Ignite node : G.allGrids()) + ZkTestCommunicationSpi.testSpi(node).initCheckResult(10); + + UUID crdId = ignite(crd).cluster().localNode().id(); + + ZookeeperDiscoverySpi spi = spi(ignite(crd + 1)); + + try { + spi.resolveCommunicationFailure(spi.getNode(crdId), new Exception("test")); + + fail("Exception is not thrown"); + } + catch (IgniteSpiException e) { + assertTrue("Unexpected exception: " + e, e.getCause() instanceof ClusterTopologyCheckedException); + } + + waitForTopology(9); + + startGrid(nodeIdx++); + + waitForTopology(10); + + crd++; + } + } + + /** + * @throws Exception If failed. + */ + public void testCommunicationFailureResolve_KillRandom() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-8179"); + + sesTimeout = 2000; + + testCommSpi = true; + commFailureRslvr = KillRandomCommunicationFailureResolver.FACTORY; + + startGridsMultiThreaded(10); + + clientMode(true); + + startGridsMultiThreaded(10, 5); + + int nodeIdx = 15; + + for (int i = 0; i < 10; i++) { + info("Iteration: " + i); + + ZookeeperDiscoverySpi spi = null; + + for (Ignite node : G.allGrids()) { + ZkTestCommunicationSpi.testSpi(node).initCheckResult(100); + + spi = spi(node); + } + + assert spi != null; + + try { + spi.resolveCommunicationFailure(spi.getRemoteNodes().iterator().next(), new Exception("test")); + } + catch (IgniteSpiException ignore) { + // No-op. + } + + clientMode(ThreadLocalRandom.current().nextBoolean()); + + startGrid(nodeIdx++); + + awaitPartitionMapExchange(); + } + } + + /** + * @throws Exception If failed. + */ + public void testDefaultCommunicationFailureResolver1() throws Exception { + testCommSpi = true; + sesTimeout = 5000; + + startGrids(3); + + ZkTestCommunicationSpi.testSpi(ignite(0)).initCheckResult(3, 0, 1); + ZkTestCommunicationSpi.testSpi(ignite(1)).initCheckResult(3, 0, 1); + ZkTestCommunicationSpi.testSpi(ignite(2)).initCheckResult(3, 2); + + UUID killedId = nodeId(2); + + assertNotNull(ignite(0).cluster().node(killedId)); + + ZookeeperDiscoverySpi spi = spi(ignite(0)); + + spi.resolveCommunicationFailure(spi.getNode(ignite(1).cluster().localNode().id()), new Exception("test")); + + waitForTopology(2); + + assertNull(ignite(0).cluster().node(killedId)); + } + + /** + * @throws Exception If failed. + */ + public void testDefaultCommunicationFailureResolver2() throws Exception { + testCommSpi = true; + sesTimeout = 5000; + + startGrids(3); + + clientMode(true); + + startGridsMultiThreaded(3, 2); + + ZkTestCommunicationSpi.testSpi(ignite(0)).initCheckResult(5, 0, 1); + ZkTestCommunicationSpi.testSpi(ignite(1)).initCheckResult(5, 0, 1); + ZkTestCommunicationSpi.testSpi(ignite(2)).initCheckResult(5, 2, 3, 4); + ZkTestCommunicationSpi.testSpi(ignite(3)).initCheckResult(5, 2, 3, 4); + ZkTestCommunicationSpi.testSpi(ignite(4)).initCheckResult(5, 2, 3, 4); + + ZookeeperDiscoverySpi spi = spi(ignite(0)); + + spi.resolveCommunicationFailure(spi.getNode(ignite(1).cluster().localNode().id()), new Exception("test")); + + waitForTopology(2); + } + + /** + * @throws Exception If failed. + */ + public void testDefaultCommunicationFailureResolver3() throws Exception { + defaultCommunicationFailureResolver_BreakCommunication(3, 1); + } + + /** + * @throws Exception If failed. + */ + public void testDefaultCommunicationFailureResolver4() throws Exception { + defaultCommunicationFailureResolver_BreakCommunication(3, 0); + } + + /** + * @throws Exception If failed. + */ + public void testDefaultCommunicationFailureResolver5() throws Exception { + defaultCommunicationFailureResolver_BreakCommunication(10, 1, 3, 6); + } + + /** + * @param startNodes Initial nodes number. + * @param breakNodes Node indices where communication server is closed. + * @throws Exception If failed. + */ + private void defaultCommunicationFailureResolver_BreakCommunication(int startNodes, final int...breakNodes) throws Exception { + sesTimeout = 5000; + + startGridsMultiThreaded(startNodes); + + final CyclicBarrier b = new CyclicBarrier(breakNodes.length); + + GridTestUtils.runMultiThreaded(new IgniteInClosure() { + @Override public void apply(Integer threadIdx) { + try { + b.await(); + + int nodeIdx = breakNodes[threadIdx]; + + info("Close communication: " + nodeIdx); + + ((TcpCommunicationSpi)ignite(nodeIdx).configuration().getCommunicationSpi()).simulateNodeFailure(); + } + catch (Exception e) { + fail("Unexpected error: " + e); + } + } + }, breakNodes.length, "break-communication"); + + waitForTopology(startNodes - breakNodes.length); + } + + /** + * @throws Exception If failed. + */ + public void testCommunicationFailureResolve_CachesInfo1() throws Exception { + testCommSpi = true; + sesTimeout = 5000; + + final CacheInfoCommunicationFailureResolver rslvr = new CacheInfoCommunicationFailureResolver(); + + commFailureRslvr = new IgniteOutClosure() { + @Override public CommunicationFailureResolver apply() { + return rslvr; + } + }; + + startGrids(2); + + awaitPartitionMapExchange(); + + Map> expCaches = new HashMap<>(); + + expCaches.put(DEFAULT_CACHE_NAME, new T3<>(RendezvousAffinityFunction.DFLT_PARTITION_COUNT, 0, 1)); + + checkResolverCachesInfo(ignite(0), expCaches); + + List caches = new ArrayList<>(); + + CacheConfiguration c1 = new CacheConfiguration("c1"); + c1.setBackups(1); + c1.setAffinity(new RendezvousAffinityFunction(false, 64)); + caches.add(c1); + + CacheConfiguration c2 = new CacheConfiguration("c2"); + c2.setBackups(2); + c2.setAffinity(new RendezvousAffinityFunction(false, 128)); + caches.add(c2); + + CacheConfiguration c3 = new CacheConfiguration("c3"); + c3.setCacheMode(CacheMode.REPLICATED); + c3.setAffinity(new RendezvousAffinityFunction(false, 256)); + caches.add(c3); + + ignite(0).createCaches(caches); + + expCaches.put("c1", new T3<>(64, 1, 2)); + expCaches.put("c2", new T3<>(128, 2, 2)); + expCaches.put("c3", new T3<>(256, 1, 2)); + + checkResolverCachesInfo(ignite(0), expCaches); + + startGrid(2); + startGrid(3); + + awaitPartitionMapExchange(); + + expCaches.put("c2", new T3<>(128, 2, 3)); + expCaches.put("c3", new T3<>(256, 1, 4)); + + checkResolverCachesInfo(ignite(0), expCaches); + + CacheConfiguration c4 = new CacheConfiguration("c4"); + c4.setCacheMode(CacheMode.PARTITIONED); + c4.setBackups(0); + c4.setAffinity(new RendezvousAffinityFunction(false, 256)); + c4.setNodeFilter(new TestCacheNodeExcludingFilter(getTestIgniteInstanceName(0), getTestIgniteInstanceName(1))); + + ignite(2).createCache(c4); + + expCaches.put("c4", new T3<>(256, 0, 1)); + + checkResolverCachesInfo(ignite(0), expCaches); + + stopGrid(0); // Stop current coordinator, check new coordinator will initialize required caches information. + + awaitPartitionMapExchange(); + + expCaches.put("c3", new T3<>(256, 1, 3)); + + checkResolverCachesInfo(ignite(1), expCaches); + + startGrid(0); + + expCaches.put("c3", new T3<>(256, 1, 4)); + + checkResolverCachesInfo(ignite(1), expCaches); + + stopGrid(1); + + expCaches.put("c3", new T3<>(256, 1, 3)); + + checkResolverCachesInfo(ignite(3), expCaches); + } + + /** + * @throws Exception If failed. + */ + public void testCommunicationFailureResolve_CachesInfo2() throws Exception { + testCommSpi = true; + sesTimeout = 5000; + + final CacheInfoCommunicationFailureResolver rslvr = new CacheInfoCommunicationFailureResolver(); + + commFailureRslvr = new IgniteOutClosure() { + @Override public CommunicationFailureResolver apply() { + return rslvr; + } + }; + + Ignite srv0 = startGrid(0); + + CacheConfiguration ccfg = new CacheConfiguration("c1