Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6121917450 for ; Fri, 17 Jul 2015 06:28:44 +0000 (UTC) Received: (qmail 24962 invoked by uid 500); 17 Jul 2015 06:28:44 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 24932 invoked by uid 500); 17 Jul 2015 06:28:44 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 24922 invoked by uid 99); 17 Jul 2015 06:28:44 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Jul 2015 06:28:44 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id A34421A71E1 for ; Fri, 17 Jul 2015 06:28:43 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.77 X-Spam-Level: * X-Spam-Status: No, score=1.77 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id LbvKlx7r7M7W for ; Fri, 17 Jul 2015 06:28:27 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id 5ED434E217 for ; Fri, 17 Jul 2015 06:28:14 +0000 (UTC) Received: (qmail 22457 invoked by uid 99); 17 Jul 2015 06:28:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Jul 2015 06:28:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AE32BE6836; Fri, 17 Jul 2015 06:28:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Fri, 17 Jul 2015 06:28:48 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [36/50] [abbrv] incubator-ignite git commit: # ignite-901 client reconnect support http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java new file mode 100644 index 0000000..be3234d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java @@ -0,0 +1,1202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import junit.framework.*; +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.managers.discovery.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.plugin.extensions.communication.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.communication.tcp.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.transactions.*; + +import javax.cache.*; +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; +import static org.apache.ignite.events.EventType.*; +import static org.apache.ignite.transactions.TransactionConcurrency.*; +import static org.apache.ignite.transactions.TransactionIsolation.*; + +/** + * + */ +public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstractTest { + /** */ + private static final int SRV_CNT = 3; + + /** */ + private static final String STATIC_CACHE = "static-cache"; + + /** */ + private UUID nodeId; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TestCommunicationSpi commSpi = new TestCommunicationSpi(); + + commSpi.setSharedMemoryPort(-1); + + cfg.setCommunicationSpi(commSpi); + + cfg.setPeerClassLoadingEnabled(false); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setNetworkTimeout(5000); + + if (nodeId != null) { + cfg.setNodeId(nodeId); + + nodeId = null; + } + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName(STATIC_CACHE); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected int serverCount() { + return 0; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startGrids(SRV_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testReconnect() throws Exception { + clientMode = true; + + IgniteEx client = startGrid(SRV_CNT); + + final TestTcpDiscoverySpi clientSpi = spi(client); + + Ignite srv = clientRouter(client); + + TestTcpDiscoverySpi srvSpi = spi(srv); + + final IgniteCache cache = client.getOrCreateCache(new CacheConfiguration<>()); + + final IgniteCache staticCache = client.cache(STATIC_CACHE); + + staticCache.put(1, 1); + + assertEquals(1, staticCache.get(1)); + + CacheConfiguration ccfg = new CacheConfiguration<>(); + + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setName("nearCache"); + + final IgniteCache nearCache = client.getOrCreateCache(ccfg, new NearCacheConfiguration<>()); + + nearCache.put(1, 1); + + assertEquals(1, nearCache.localPeek(1)); + + cache.put(1, 1); + + final CountDownLatch disconnectLatch = new CountDownLatch(1); + + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + log.info("Block reconnect."); + + clientSpi.writeLatch = new CountDownLatch(1); + + final AtomicReference blockPutRef = new AtomicReference<>(); + + client.events().localListen(new IgnitePredicate() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + info("Disconnected: " + evt); + + assertEquals(1, reconnectLatch.getCount()); + + blockPutRef.set(GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + log.info("Start put."); + + try { + cache.put(2, 2); + + fail(); + } + catch (CacheException e) { + log.info("Expected exception: " + e); + + IgniteClientDisconnectedException e0 = (IgniteClientDisconnectedException) e.getCause(); + + e0.reconnectFuture().get(); + } + + cache.put(2, 2); + + log.info("Finish put."); + + return null; + } + })); + + disconnectLatch.countDown(); + } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + assertEquals(0, disconnectLatch.getCount()); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + log.info("Fail client."); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + waitReconnectEvent(disconnectLatch); + + IgniteInternalFuture putFut = blockPutRef.get(); + + assertNotDone(putFut); + + U.sleep(5000); + + assertNotDone(putFut); + + log.info("Allow reconnect."); + + clientSpi.writeLatch.countDown(); + + assertTrue(reconnectLatch.await(5000, MILLISECONDS)); + + checkCacheDiscoveryData(srv, client, null, true, true, false); + + checkCacheDiscoveryData(srv, client, "nearCache", true, true, true); + + checkCacheDiscoveryData(srv, client, STATIC_CACHE, true, true, false); + + assertEquals(1, cache.get(1)); + + putFut.get(); + + assertEquals(2, cache.get(2)); + + cache.put(3, 3); + + assertEquals(3, cache.get(3)); + + assertNull(nearCache.localPeek(1)); + + staticCache.put(10, 10); + + assertEquals(10, staticCache.get(10)); + + nearCache.put(20, 20); + + srv.cache(nearCache.getName()).put(20, 21); + + assertEquals(21, nearCache.localPeek(20)); + + this.clientMode = false; + + IgniteEx srv2 = startGrid(SRV_CNT + 1); + + Integer key = primaryKey(srv2.cache(null)); + + cache.put(key, 4); + + assertEquals(4, cache.get(key)); + + checkCacheDiscoveryData(srv2, client, null, true, true, false); + + checkCacheDiscoveryData(srv2, client, "nearCache", true, true, true); + + checkCacheDiscoveryData(srv2, client, STATIC_CACHE, true, true, false); + + staticCache.put(20, 20); + + assertEquals(20, staticCache.get(20)); + + srv.cache(nearCache.getName()).put(20, 22); + + assertEquals(22, nearCache.localPeek(20)); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectTransactions() throws Exception { + clientMode = true; + + IgniteEx client = startGrid(SRV_CNT); + + Ignite srv = clientRouter(client); + + CacheConfiguration ccfg = new CacheConfiguration<>(); + + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + + IgniteCache cache = client.getOrCreateCache(ccfg); + + final IgniteTransactions txs = client.transactions(); + + final Transaction tx = txs.txStart(OPTIMISTIC, REPEATABLE_READ); + + cache.put(1, 1); + + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + try { + tx.commit(); + + fail(); + } catch (IgniteClientDisconnectedException e) { + log.info("Expected error: " + e); + + assertNotNull(e.reconnectFuture()); + } + + try { + txs.txStart(); + + fail(); + } catch (IgniteClientDisconnectedException e) { + log.info("Expected error: " + e); + + assertNotNull(e.reconnectFuture()); + } + } + }); + + assertNull(txs.tx()); + + try (Transaction tx0 = txs.txStart(OPTIMISTIC, REPEATABLE_READ)) { + cache.put(1, 1); + + assertEquals(1, cache.get(1)); + + tx0.commit(); + } + + try (Transaction tx0 = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(2, 2); + + assertEquals(2, cache.get(2)); + + tx0.commit(); + } + } + + /** + * @throws Exception If failed. + */ + public void testReconnectTransactionInProgress1() throws Exception { + clientMode = true; + + IgniteEx client = startGrid(SRV_CNT); + + CacheConfiguration ccfg = new CacheConfiguration<>(); + + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + IgniteCache cache = client.getOrCreateCache(ccfg); + + reconnectTransactionInProgress1(client, OPTIMISTIC, cache); + + reconnectTransactionInProgress1(client, PESSIMISTIC, cache); + } + + /** + * @param client Client. + * @param txConcurrency Transaction concurrency mode. + * @param cache Cache. + * @throws Exception If failed. + */ + private void reconnectTransactionInProgress1(IgniteEx client, + final TransactionConcurrency txConcurrency, + final IgniteCache cache) + throws Exception + { + Ignite srv = clientRouter(client); + + final TestTcpDiscoverySpi clientSpi = spi(client); + final TestTcpDiscoverySpi srvSpi = spi(srv); + + final CountDownLatch disconnectLatch = new CountDownLatch(1); + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + log.info("Block reconnect."); + + clientSpi.writeLatch = new CountDownLatch(1); + + client.events().localListen(new IgnitePredicate() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + info("Disconnected: " + evt); + + disconnectLatch.countDown(); + } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + final IgniteTransactions txs = client.transactions(); + + final CountDownLatch afterPut1 = new CountDownLatch(1); + + final CountDownLatch afterPut2 = new CountDownLatch(1); + + final CountDownLatch putFailed = new CountDownLatch(1); + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Boolean call() throws Exception { + try { + log.info("Start tx1: " + txConcurrency); + + try (Transaction tx = txs.txStart(txConcurrency, REPEATABLE_READ)) { + cache.put(1, 1); + + afterPut1.countDown(); + + afterPut2.await(); + + cache.put(2, 2); + + fail(); + } + catch (CacheException e) { + log.info("Expected exception: " + e); + + putFailed.countDown(); + + IgniteClientDisconnectedException e0 = (IgniteClientDisconnectedException)e.getCause(); + + e0.reconnectFuture().get(); + } + + log.info("Start tx2: " + txConcurrency); + + try (Transaction tx = txs.txStart(txConcurrency, REPEATABLE_READ)) { + cache.put(1, 1); + + cache.put(2, 2); + + tx.commit(); + } + + assertEquals(1, cache.get(1)); + assertEquals(2, cache.get(2)); + + try (Transaction tx = txs.txStart(txConcurrency, REPEATABLE_READ)) { + cache.put(3, 3); + + cache.put(4, 4); + + tx.commit(); + } + + assertEquals(1, cache.get(1)); + assertEquals(2, cache.get(2)); + assertEquals(3, cache.get(3)); + assertEquals(4, cache.get(4)); + + cache.removeAll(); + + return true; + } + catch (AssertionFailedError e) { + throw e; + } + catch (Throwable e) { + log.error("Unexpected error", e); + + fail("Unexpected error: " + e); + + return false; + } + } + }); + + assertTrue(afterPut1.await(5000, MILLISECONDS)); + + assertNotDone(fut); + + srvSpi.failNode(client.localNode().id(), null); + + waitReconnectEvent(disconnectLatch); + + afterPut2.countDown(); + + assertTrue(putFailed.await(5000, MILLISECONDS)); + + clientSpi.writeLatch.countDown(); + + waitReconnectEvent(reconnectLatch); + + assertTrue(fut.get()); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectTransactionInProgress2() throws Exception { + clientMode = true; + + final IgniteEx client = startGrid(SRV_CNT); + + CacheConfiguration ccfg = new CacheConfiguration<>(); + + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + txInProgressFails(client, ccfg, GridNearTxPrepareResponse.class, OPTIMISTIC, 1); + + txInProgressFails(client, ccfg, GridNearTxPrepareResponse.class, PESSIMISTIC, 2); + + txInProgressFails(client, ccfg, GridNearTxFinishResponse.class, OPTIMISTIC, 3); + + txInProgressFails(client, ccfg, GridNearTxFinishResponse.class, PESSIMISTIC, 4); + + txInProgressFails(client, ccfg, GridNearLockResponse.class, PESSIMISTIC, 5); + } + + /** + * @param client Client. + * @param ccfg Cache configuration. + * @param msgToBlock Message to block. + * @param txConcurrency Transaction concurrency mode. + * @param key Key. + * @throws Exception If failed. + */ + private void txInProgressFails(final IgniteEx client, + final CacheConfiguration ccfg, + Class msgToBlock, + final TransactionConcurrency txConcurrency, + final Integer key) throws Exception { + log.info("Test tx failure [msg=" + msgToBlock + ", txMode=" + txConcurrency + ", key=" + key + ']'); + + checkOperationInProgressFails(client, ccfg, msgToBlock, + new CI1>() { + @Override public void apply(IgniteCache cache) { + try (Transaction tx = client.transactions().txStart(txConcurrency, REPEATABLE_READ)) { + log.info("Put1: " + key); + + cache.put(key, key); + + Integer key2 = key + 1; + + log.info("Put2: " + key2); + + cache.put(key2, key2); + + log.info("Commit [key1=" + key + ", key2=" + key2 + ']'); + + tx.commit(); + } + } + } + ); + + IgniteCache cache = client.cache(ccfg.getName()); + + assertEquals(key, cache.get(key)); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectExchangeInProgress() throws Exception { + clientMode = true; + + IgniteEx client = startGrid(SRV_CNT); + + Ignite srv = clientRouter(client); + + TestTcpDiscoverySpi srvSpi = spi(srv); + + TestCommunicationSpi coordCommSpi = (TestCommunicationSpi)grid(0).configuration().getCommunicationSpi(); + + coordCommSpi.blockMessages(GridDhtPartitionsFullMessage.class, client.localNode().id()); + + clientMode = false; + + startGrid(SRV_CNT + 1); + + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + client.events().localListen(new IgnitePredicate() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_RECONNECTED); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + assertTrue(reconnectLatch.await(5000, MILLISECONDS)); + + try { + coordCommSpi.stopBlock(true); + + fail(); + } + catch (IgniteException e) { + log.info("Expected error: " + e); + } + + CacheConfiguration ccfg = new CacheConfiguration<>(); + + ccfg.setName("newCache"); + + ccfg.setCacheMode(REPLICATED); + + log.info("Start new cache."); + + IgniteCache cache = client.getOrCreateCache(ccfg); + + cache.put(1, 1); + + assertEquals(1, cache.get(1)); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectInitialExchangeInProgress() throws Exception { + final UUID clientId = UUID.randomUUID(); + + Ignite srv = grid(0); + + final CountDownLatch joinLatch = new CountDownLatch(1); + + srv.events().localListen(new IgnitePredicate() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_NODE_JOINED && ((DiscoveryEvent)evt).eventNode().id().equals(clientId)) { + info("Client joined: " + evt); + + joinLatch.countDown(); + } + + return true; + } + }, EVT_NODE_JOINED); + + TestCommunicationSpi srvCommSpi = (TestCommunicationSpi)srv.configuration().getCommunicationSpi(); + + srvCommSpi.blockMessages(GridDhtPartitionsFullMessage.class, clientId); + + clientMode = true; + + nodeId = clientId; + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Boolean call() throws Exception { + try { + Ignition.start(getConfiguration(getTestGridName(SRV_CNT))); + + fail(); + + return false; + } + catch (IgniteClientDisconnectedException e) { + log.info("Expected start error: " + e); + + try { + e.reconnectFuture().get(); + + fail(); + } + catch (IgniteException e0) { + log.info("Expected future error: " + e0); + } + + return true; + } + catch (Throwable e) { + log.error("Unexpected error: " + e, e); + + throw e; + } + } + }); + + TestTcpDiscoverySpi srvSpi = spi(srv); + + assertTrue(joinLatch.await(5000, MILLISECONDS)); + + U.sleep(1000); + + assertNotDone(fut); + + srvSpi.failNode(clientId, null); + + srvCommSpi.stopBlock(false); + + assertTrue(fut.get()); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectOperationInProgress() throws Exception { + clientMode = true; + + IgniteEx client = startGrid(SRV_CNT); + + client.events().localListen(new IgnitePredicate() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) + info("Client disconnected: " + evt); + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) + info("Client reconnected: " + evt); + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + IgniteInClosure> putOp = new CI1>() { + @Override public void apply(IgniteCache cache) { + cache.put(1, 1); + } + }; + + IgniteInClosure> getOp = new CI1>() { + @Override public void apply(IgniteCache cache) { + cache.get(1); + } + }; + + int cnt = 0; + + for (CacheAtomicityMode atomicityMode : CacheAtomicityMode.values()) { + CacheAtomicWriteOrderMode[] writeOrders = + atomicityMode == ATOMIC ? CacheAtomicWriteOrderMode.values() : + new CacheAtomicWriteOrderMode[]{CacheAtomicWriteOrderMode.CLOCK}; + + for (CacheAtomicWriteOrderMode writeOrder : writeOrders) { + for (CacheWriteSynchronizationMode syncMode : CacheWriteSynchronizationMode.values()) { + CacheConfiguration ccfg = new CacheConfiguration<>(); + + ccfg.setAtomicityMode(atomicityMode); + + ccfg.setAtomicWriteOrderMode(writeOrder); + + ccfg.setName("cache-" + cnt++); + + ccfg.setWriteSynchronizationMode(syncMode); + + if (syncMode != FULL_ASYNC) { + Class cls = (ccfg.getAtomicityMode() == ATOMIC) ? + GridNearAtomicUpdateResponse.class : GridNearTxPrepareResponse.class; + + log.info("Test cache put [atomicity=" + atomicityMode + + ", writeOrder=" + writeOrder + + ", syncMode=" + syncMode + ']'); + + checkOperationInProgressFails(client, ccfg, cls, putOp); + + client.destroyCache(ccfg.getName()); + } + + log.info("Test cache get [atomicity=" + atomicityMode + ", syncMode=" + syncMode + ']'); + + checkOperationInProgressFails(client, ccfg, GridNearGetResponse.class, getOp); + + client.destroyCache(ccfg.getName()); + } + } + } + } + + /** + * @throws Exception If failed. + */ + public void testReconnectCacheDestroyed() throws Exception { + clientMode = true; + + final IgniteEx client = startGrid(SRV_CNT); + + assertTrue(client.cluster().localNode().isClient()); + + final Ignite srv = clientRouter(client); + + final IgniteCache clientCache = client.getOrCreateCache(new CacheConfiguration<>()); + + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + srv.destroyCache(null); + } + }); + + GridTestUtils.assertThrows(log, new Callable() { + @Override public Object call() throws Exception { + return clientCache.get(1); + } + }, IllegalStateException.class, null); + + checkCacheDiscoveryData(srv, client, null, false, false, false); + + IgniteCache clientCache0 = client.getOrCreateCache(new CacheConfiguration<>()); + + checkCacheDiscoveryData(srv, client, null, true, true, false); + + clientCache0.put(1, 1); + + assertEquals(1, clientCache0.get(1)); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectCacheDestroyedAndCreated() throws Exception { + clientMode = true; + + final Ignite client = startGrid(SRV_CNT); + + assertTrue(client.cluster().localNode().isClient()); + + final Ignite srv = clientRouter(client); + + final IgniteCache clientCache = client.getOrCreateCache(new CacheConfiguration<>()); + + assertEquals(ATOMIC, + clientCache.getConfiguration(CacheConfiguration.class).getAtomicityMode()); + + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + srv.destroyCache(null); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setAtomicityMode(TRANSACTIONAL); + + srv.getOrCreateCache(ccfg); + } + }); + + GridTestUtils.assertThrows(log, new Callable() { + @Override public Object call() throws Exception { + return clientCache.get(1); + } + }, IllegalStateException.class, null); + + checkCacheDiscoveryData(srv, client, null, true, false, false); + + IgniteCache clientCache0 = client.cache(null); + + checkCacheDiscoveryData(srv, client, null, true, true, false); + + assertEquals(TRANSACTIONAL, + clientCache0.getConfiguration(CacheConfiguration.class).getAtomicityMode()); + + clientCache0.put(1, 1); + + assertEquals(1, clientCache0.get(1)); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectMarshallerCache() throws Exception { + clientMode = true; + + final Ignite client = startGrid(SRV_CNT); + + assertTrue(client.cluster().localNode().isClient()); + + final Ignite srv = clientRouter(client); + + final IgniteCache clientCache = client.getOrCreateCache(new CacheConfiguration<>()); + final IgniteCache srvCache = srv.cache(null); + + assertNotNull(srvCache); + + clientCache.put(1, new TestClass1()); + srvCache.put(2, new TestClass2()); + + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + assertNotNull(srvCache.get(1)); + assertNotNull(srvCache.get(2)); + + srvCache.put(3, new TestClass3()); + } + }); + + srvCache.put(4, new TestClass4()); + + assertNotNull(clientCache.get(1)); + assertNotNull(clientCache.get(2)); + assertNotNull(clientCache.get(3)); + assertNotNull(clientCache.get(4)); + + clientCache.put(5, new TestClass5()); + + assertNotNull(srvCache.get(5)); + assertNotNull(clientCache.get(5)); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectClusterRestart() throws Exception { + clientMode = true; + + final Ignite client = startGrid(SRV_CNT); + + assertTrue(client.cluster().localNode().isClient()); + + final CountDownLatch disconnectLatch = new CountDownLatch(1); + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + final IgniteCache clientCache = client.getOrCreateCache(new CacheConfiguration<>()); + + clientCache.put(1, new TestClass1()); + + client.events().localListen(new IgnitePredicate() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + info("Disconnected: " + evt); + + disconnectLatch.countDown(); + } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + for (int i = 0; i < SRV_CNT; i++) + stopGrid(i); + + assertTrue(disconnectLatch.await(30_000, MILLISECONDS)); + + clientMode = false; + + Ignite srv = startGrid(0); + + assertTrue(reconnectLatch.await(10_000, MILLISECONDS)); + + GridTestUtils.assertThrows(log, new Callable() { + @Override public Object call() throws Exception { + return clientCache.get(1); + } + }, IllegalStateException.class, null); + + IgniteCache srvCache = srv.getOrCreateCache(new CacheConfiguration<>()); + + srvCache.put(1, new TestClass1()); + srvCache.put(2, new TestClass2()); + + IgniteCache clientCache2 = client.cache(null); + + assertNotNull(clientCache2); + + assertNotNull(clientCache2.get(1)); + assertNotNull(clientCache2.get(2)); + } + + /** + * + */ + static class TestClass1 implements Serializable {} + + /** + * + */ + static class TestClass2 implements Serializable {} + + /** + * + */ + static class TestClass3 implements Serializable {} + + /** + * + */ + static class TestClass4 implements Serializable {} + + /** + * + */ + static class TestClass5 implements Serializable {} + + /** + * @param client Client. + * @param ccfg Cache configuration. + * @param msgToBlock Message to block. + * @param c Cache operation closure. + * @throws Exception If failed. + */ + private void checkOperationInProgressFails(IgniteEx client, + final CacheConfiguration ccfg, + Class msgToBlock, + final IgniteInClosure> c) + throws Exception + { + Ignite srv = clientRouter(client); + + TestTcpDiscoverySpi srvSpi = spi(srv); + + final IgniteCache cache = client.getOrCreateCache(ccfg); + + for (int i = 0; i < SRV_CNT; i++) { + TestCommunicationSpi srvCommSpi = (TestCommunicationSpi)grid(i).configuration().getCommunicationSpi(); + + srvCommSpi.blockMessages(msgToBlock, client.localNode().id()); + } + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + IgniteClientDisconnectedException e0 = null; + + try { + c.apply(cache); + + fail(); + } + catch (IgniteClientDisconnectedException e) { + log.info("Expected exception: " + e); + + e0 = e; + } + catch (CacheException e) { + log.info("Expected exception: " + e); + + assertTrue("Unexpected cause: " + e.getCause(), + e.getCause() instanceof IgniteClientDisconnectedException); + + e0 = (IgniteClientDisconnectedException)e.getCause(); + } + + assertNotNull(e0); + assertNotNull(e0.reconnectFuture()); + + e0.reconnectFuture().get(); + + c.apply(cache); + + return null; + } + }); + + Thread.sleep(1000); + + assertNotDone(fut); + + log.info("Fail client: " + client.localNode().id()); + + srvSpi.failNode(client.localNode().id(), null); + + fut.get(); + + for (int i = 0; i < SRV_CNT; i++) + ((TestCommunicationSpi)grid(i).configuration().getCommunicationSpi()).stopBlock(false); + + cache.put(1, 1); + + assertEquals(1, cache.get(1)); + } + + /** + * @param srv Server node. + * @param client Client node. + * @param cacheName Cache name. + * @param cacheExists Cache exists flag. + * @param clientCache {@code True} if client node has client cache. + * @param clientNear {@code True} if client node has near-enabled client cache. + */ + private void checkCacheDiscoveryData(Ignite srv, + Ignite client, + String cacheName, + boolean cacheExists, + boolean clientCache, + boolean clientNear) + { + GridDiscoveryManager srvDisco = ((IgniteKernal)srv).context().discovery(); + GridDiscoveryManager clientDisco = ((IgniteKernal)client).context().discovery(); + + ClusterNode srvNode = ((IgniteKernal)srv).localNode(); + ClusterNode clientNode = ((IgniteKernal)client).localNode(); + + assertFalse(srvDisco.cacheAffinityNode(clientNode, cacheName)); + assertFalse(clientDisco.cacheAffinityNode(clientNode, cacheName)); + + assertEquals(cacheExists, srvDisco.cacheAffinityNode(srvNode, cacheName)); + + if (clientNear) + assertTrue(srvDisco.cacheNearNode(clientNode, cacheName)); + else + assertEquals(clientCache, srvDisco.cacheClientNode(clientNode, cacheName)); + + assertEquals(cacheExists, clientDisco.cacheAffinityNode(srvNode, cacheName)); + + if (clientNear) + assertTrue(clientDisco.cacheNearNode(clientNode, cacheName)); + else + assertEquals(clientCache, clientDisco.cacheClientNode(clientNode, cacheName)); + + if (cacheExists) { + if (clientCache || clientNear) { + assertTrue(client.cluster().forClientNodes(cacheName).nodes().contains(clientNode)); + assertTrue(srv.cluster().forClientNodes(cacheName).nodes().contains(clientNode)); + } + else { + assertFalse(client.cluster().forClientNodes(cacheName).nodes().contains(clientNode)); + assertFalse(srv.cluster().forClientNodes(cacheName).nodes().contains(clientNode)); + } + } + else { + assertTrue(client.cluster().forClientNodes(cacheName).nodes().isEmpty()); + assertTrue(srv.cluster().forClientNodes(cacheName).nodes().isEmpty()); + } + } + + /** + * + */ + private static class TestCommunicationSpi extends TcpCommunicationSpi { + /** */ + @LoggerResource + private IgniteLogger log; + + /** */ + private List> blockedMsgs = new ArrayList<>(); + + /** */ + private Map, Set> blockCls = new HashMap<>(); + + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { + if (msg instanceof GridIoMessage) { + Object msg0 = ((GridIoMessage)msg).message(); + + synchronized (this) { + Set blockNodes = blockCls.get(msg0.getClass()); + + if (F.contains(blockNodes, node.id())) { + log.info("Block message [node=" + node.attribute(IgniteNodeAttributes.ATTR_GRID_NAME) + + ", msg=" + msg0 + ']'); + + blockedMsgs.add(new T2<>(node, (GridIoMessage)msg)); + + return; + } + } + } + + super.sendMessage(node, msg); + } + + /** + * @param cls Message class. + * @param nodeId Node ID. + */ + void blockMessages(Class cls, UUID nodeId) { + synchronized (this) { + Set set = blockCls.get(cls); + + if (set == null) { + set = new HashSet<>(); + + blockCls.put(cls, set); + } + + set.add(nodeId); + } + } + + /** + * @param snd Send messages flag. + */ + void stopBlock(boolean snd) { + synchronized (this) { + blockCls.clear(); + + if (snd) { + for (T2 msg : blockedMsgs) { + ClusterNode node = msg.get1(); + + log.info("Send blocked message: [node=" + node.attribute(IgniteNodeAttributes.ATTR_GRID_NAME) + + ", msg=" + msg.get2().message() + ']'); + + super.sendMessage(msg.get1(), msg.get2()); + } + } + + blockedMsgs.clear(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java new file mode 100644 index 0000000..ed811d9 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java @@ -0,0 +1,443 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.testframework.*; + +import java.util.concurrent.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; + +/** + * + */ +public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectAbstractTest { + /** {@inheritDoc} */ + @Override protected int serverCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected int clientCount() { + return 1; + } + + /** + * @throws Exception If failed. + */ + public void testQueueReconnect() throws Exception { + CollectionConfiguration colCfg = new CollectionConfiguration(); + + colCfg.setCacheMode(PARTITIONED); + colCfg.setAtomicityMode(TRANSACTIONAL); + + queueReconnect(colCfg); + + colCfg = new CollectionConfiguration(); + + colCfg.setCacheMode(PARTITIONED); + colCfg.setAtomicityMode(ATOMIC); + + queueReconnect(colCfg); + } + + /** + * @throws Exception If failed. + */ + public void testQueueReconnectRemoved() throws Exception { + CollectionConfiguration colCfg = new CollectionConfiguration(); + + colCfg.setCacheMode(PARTITIONED); + colCfg.setAtomicityMode(TRANSACTIONAL); + + queueReconnectRemoved(colCfg); + + colCfg = new CollectionConfiguration(); + + colCfg.setCacheMode(PARTITIONED); + colCfg.setAtomicityMode(ATOMIC); + + queueReconnectRemoved(colCfg); + } + + /** + * @throws Exception If failed. + */ + public void testQueueReconnectInProgress() throws Exception { + CollectionConfiguration colCfg = new CollectionConfiguration(); + + colCfg.setCacheMode(PARTITIONED); + colCfg.setAtomicityMode(TRANSACTIONAL); + + queueReconnectInProgress(colCfg); + + colCfg = new CollectionConfiguration(); + + colCfg.setCacheMode(PARTITIONED); + colCfg.setAtomicityMode(ATOMIC); + + queueReconnectInProgress(colCfg); + } + + /** + * @throws Exception If failed. + */ + public void testSetReconnect() throws Exception { + CollectionConfiguration colCfg = new CollectionConfiguration(); + + colCfg.setCacheMode(PARTITIONED); + colCfg.setAtomicityMode(TRANSACTIONAL); + + setReconnect(colCfg); + + colCfg = new CollectionConfiguration(); + + colCfg.setCacheMode(PARTITIONED); + colCfg.setAtomicityMode(ATOMIC); + + setReconnect(colCfg); + } + + /** + * @throws Exception If failed. + */ + public void testSetReconnectRemoved() throws Exception { + CollectionConfiguration colCfg = new CollectionConfiguration(); + + colCfg.setCacheMode(PARTITIONED); + colCfg.setAtomicityMode(ATOMIC); + + setReconnectRemove(colCfg); + + colCfg = new CollectionConfiguration(); + + colCfg.setCacheMode(PARTITIONED); + colCfg.setAtomicityMode(TRANSACTIONAL); + + setReconnectRemove(colCfg); + } + + /** + * @throws Exception If failed. + */ + public void testSetReconnectInProgress() throws Exception { + CollectionConfiguration colCfg = new CollectionConfiguration(); + + colCfg.setCacheMode(PARTITIONED); + colCfg.setAtomicityMode(ATOMIC); + + setReconnectInProgress(colCfg); + + colCfg = new CollectionConfiguration(); + + colCfg.setCacheMode(PARTITIONED); + colCfg.setAtomicityMode(TRANSACTIONAL); + + setReconnectInProgress(colCfg); + } + + /** + * @param colCfg Collection configuration. + * @throws Exception If failed. + */ + private void setReconnect(CollectionConfiguration colCfg) throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + final String setName = "set-" + colCfg.getAtomicityMode(); + + IgniteSet clientSet = client.set(setName, colCfg); + + final IgniteSet srvSet = srv.set(setName, null); + + assertTrue(clientSet.add("1")); + + assertFalse(srvSet.add("1")); + + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + assertTrue(srvSet.add("2")); + } + }); + + assertFalse(clientSet.add("2")); + + assertTrue(clientSet.remove("2")); + + assertFalse(srvSet.contains("2")); + } + + /** + * @param colCfg Collection configuration. + * @throws Exception If failed. + */ + private void setReconnectRemove(CollectionConfiguration colCfg) throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + final Ignite srv = clientRouter(client); + + final String setName = "set-rm-" + colCfg.getAtomicityMode(); + + final IgniteSet clientSet = client.set(setName, colCfg); + + final IgniteSet srvSet = srv.set(setName, null); + + assertTrue(clientSet.add("1")); + + assertFalse(srvSet.add("1")); + + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + srvSet.close(); + } + }); + + GridTestUtils.assertThrows(log, new Callable() { + @Override public Object call() throws Exception { + clientSet.add("fail"); + + return null; + } + }, IllegalStateException.class, null); + + IgniteSet newClientSet = client.set(setName, colCfg); + + IgniteSet newSrvSet = srv.set(setName, null); + + assertTrue(newClientSet.add("1")); + + assertFalse(newSrvSet.add("1")); + + newSrvSet.close(); + } + + /** + * @param colCfg Collection configuration. + * @throws Exception If failed. + */ + private void setReconnectInProgress(final CollectionConfiguration colCfg) throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + final Ignite srv = clientRouter(client); + + final String setName = "set-in-progress-" + colCfg.getAtomicityMode(); + + final IgniteSet clientSet = client.set(setName, colCfg); + + final IgniteSet srvSet = srv.set(setName, null); + + assertTrue(clientSet.add("1")); + + assertFalse(srvSet.add("1")); + + BlockTpcCommunicationSpi commSpi = commSpi(srv); + + if (colCfg.getAtomicityMode() == ATOMIC) + commSpi.blockMessage(GridNearAtomicUpdateResponse.class); + else + commSpi.blockMessage(GridNearTxPrepareResponse.class); + + final IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + try { + for (int i = 0; i < 100; i++) + clientSet.add("2"); + } + catch (IgniteClientDisconnectedException e) { + checkAndWait(e); + + return true; + } + + return false; + } + }); + + // Check that client waiting operation. + GridTestUtils.assertThrows(log, new Callable() { + @Override public Object call() throws Exception { + return fut.get(200); + } + }, IgniteFutureTimeoutCheckedException.class, null); + + assertNotDone(fut); + + commSpi.unblockMessage(); + + reconnectClientNode(client, srv, null); + + assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS)); + + assertTrue(clientSet.add("3")); + + assertFalse(srvSet.add("3")); + + srvSet.close(); + } + + /** + * @param colCfg Collection configuration. + * @throws Exception If failed. + */ + private void queueReconnect(CollectionConfiguration colCfg) throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + final String setName = "queue-" + colCfg.getAtomicityMode(); + + IgniteQueue clientQueue = client.queue(setName, 10, colCfg); + + final IgniteQueue srvQueue = srv.queue(setName, 10, null); + + assertTrue(clientQueue.offer("1")); + + assertTrue(srvQueue.contains("1")); + + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + assertTrue(srvQueue.add("2")); + } + }); + + assertTrue(clientQueue.contains("2")); + + assertEquals("1", clientQueue.poll()); + } + + /** + * @param colCfg Collection configuration. + * @throws Exception If failed. + */ + private void queueReconnectRemoved(CollectionConfiguration colCfg) throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + final String setName = "queue-rmv" + colCfg.getAtomicityMode(); + + final IgniteQueue clientQueue = client.queue(setName, 10, colCfg); + + final IgniteQueue srvQueue = srv.queue(setName, 10, null); + + assertTrue(clientQueue.add("1")); + + assertTrue(srvQueue.add("2")); + + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + srvQueue.close(); + } + }); + + GridTestUtils.assertThrows(log, new Callable() { + @Override public Object call() throws Exception { + clientQueue.add("fail"); + + return null; + } + }, IllegalStateException.class, null); + + IgniteQueue newClientQueue = client.queue(setName, 10, colCfg); + + IgniteQueue newSrvQueue = srv.queue(setName, 10, null); + + assertTrue(newClientQueue.add("1")); + + assertTrue(newSrvQueue.add("2")); + } + + /** + * @param colCfg Collection configuration. + * @throws Exception If failed. + */ + private void queueReconnectInProgress(final CollectionConfiguration colCfg) throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + final String setName = "queue-rmv" + colCfg.getAtomicityMode(); + + final IgniteQueue clientQueue = client.queue(setName, 10, colCfg); + + final IgniteQueue srvQueue = srv.queue(setName, 10, null); + + assertTrue(clientQueue.offer("1")); + + assertTrue(srvQueue.contains("1")); + + BlockTpcCommunicationSpi commSpi = commSpi(srv); + + if (colCfg.getAtomicityMode() == ATOMIC) + commSpi.blockMessage(GridNearAtomicUpdateResponse.class); + else + commSpi.blockMessage(GridNearTxPrepareResponse.class); + + final IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + try { + clientQueue.add("2"); + } + catch (IgniteClientDisconnectedException e) { + checkAndWait(e); + + return true; + } + + return false; + } + }); + + // Check that client waiting operation. + GridTestUtils.assertThrows(log, new Callable() { + @Override public Object call() throws Exception { + return fut.get(200); + } + }, IgniteFutureTimeoutCheckedException.class, null); + + assertNotDone(fut); + + commSpi.unblockMessage(); + + reconnectClientNode(client, srv, null); + + assertTrue("Future was not failed. Atomic mode: " + colCfg.getAtomicityMode() + ".", (Boolean)fut.get()); + + assertTrue(clientQueue.add("3")); + + assertEquals("1", clientQueue.poll()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java new file mode 100644 index 0000000..e9667a1 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.testframework.*; + +import java.util.*; +import java.util.concurrent.*; + +/** + * + */ +public class IgniteClientReconnectComputeTest extends IgniteClientReconnectAbstractTest { + /** {@inheritDoc} */ + @Override protected int serverCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected int clientCount() { + return 1; + } + + /** + * @throws Exception If failed. + */ + public void testReconnectAffinityCallInProgress() throws Exception { + final Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + IgniteCache cache = client.getOrCreateCache("test-cache"); + + for (int i = 0; i < 100; i++) + cache.put(i, i); + + BlockTpcCommunicationSpi commSpi = commSpi(srv); + + commSpi.blockMessage(GridJobExecuteResponse.class); + + final IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + try { + client.compute().affinityCall("test-cache", 40, new IgniteCallable() { + @Override public Integer call() throws Exception { + return 42; + } + }); + } + catch (IgniteClientDisconnectedException e) { + checkAndWait(e); + + return true; + } + + return false; + } + }); + + // Check that client waiting operation. + GridTestUtils.assertThrows(log, new Callable() { + @Override public Object call() throws Exception { + return fut.get(200); + } + }, IgniteFutureTimeoutCheckedException.class, null); + + assertNotDone(fut); + + commSpi.unblockMessage(); + + reconnectClientNode(client, srv, null); + + assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS)); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectBroadcastInProgress() throws Exception { + final Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + BlockTpcCommunicationSpi commSpi = commSpi(srv); + + commSpi.blockMessage(GridJobExecuteResponse.class); + + final IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + try { + client.compute().broadcast(new IgniteCallable() { + @Override public Object call() throws Exception { + return 42; + } + }); + } + catch (IgniteClientDisconnectedException e) { + checkAndWait(e); + + return true; + } + + return false; + } + }); + + // Check that client waiting operation. + GridTestUtils.assertThrows(log, new Callable() { + @Override public Object call() throws Exception { + return fut.get(200); + } + }, IgniteFutureTimeoutCheckedException.class, null); + + assertNotDone(fut); + + commSpi.unblockMessage(); + + reconnectClientNode(client, srv, null); + + assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS)); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectApplyInProgress() throws Exception { + final Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + BlockTpcCommunicationSpi commSpi = commSpi(srv); + + commSpi.blockMessage(GridJobExecuteResponse.class); + + final IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + try { + client.compute().apply(new IgniteClosure() { + @Override public Integer apply(Integer o) { + return o + 1; + } + }, Arrays.asList(1, 2, 3)); + } + catch (IgniteClientDisconnectedException e) { + checkAndWait(e); + + return true; + } + + return false; + } + }); + + // Check that client waiting operation. + GridTestUtils.assertThrows(log, new Callable() { + @Override public Object call() throws Exception { + return fut.get(200); + } + }, IgniteFutureTimeoutCheckedException.class, null); + + assertNotDone(fut); + + commSpi.unblockMessage(); + + reconnectClientNode(client, srv, null); + + assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java new file mode 100644 index 0000000..2bfdc85b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; + +import javax.cache.event.*; +import java.util.*; +import java.util.concurrent.*; + +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.events.EventType.*; + +/** + * + */ +public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientReconnectAbstractTest { + /** */ + private static volatile CountDownLatch latch; + + /** {@inheritDoc} */ + @Override protected int serverCount() { + return 3; + } + + /** {@inheritDoc} */ + @Override protected int clientCount() { + return 1; + } + + /** + * @throws Exception If failed. + */ + public void testEventListenerReconnect() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + TestTcpDiscoverySpi srvSpi = spi(srv); + + EventListener lsnr = new EventListener(); + + UUID opId = client.events().remoteListen(lsnr, null, EventType.EVT_JOB_STARTED); + + lsnr.latch = new CountDownLatch(1); + + log.info("Created remote listener: " + opId); + + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + client.events().localListen(new IgnitePredicate() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_RECONNECTED); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + waitReconnectEvent(reconnectLatch); + + client.compute().run(new DummyJob()); + + assertTrue(lsnr.latch.await(5000, MILLISECONDS)); + + lsnr.latch = new CountDownLatch(1); + + srv.compute().run(new DummyJob()); + + assertTrue(lsnr.latch.await(5000, MILLISECONDS)); + + lsnr.latch = new CountDownLatch(1); + + log.info("Stop listen, should not get events anymore."); + + client.events().stopRemoteListen(opId); + + assertFalse(lsnr.latch.await(3000, MILLISECONDS)); + } + + /** + * @throws Exception If failed. + */ + public void testMessageListenerReconnect() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + TestTcpDiscoverySpi srvSpi = spi(srv); + + final String topic = "testTopic"; + + MessageListener locLsnr = new MessageListener(); + + UUID opId = client.message().remoteListen(topic, new RemoteMessageListener()); + + client.message().localListen(topic, locLsnr); + + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + client.events().localListen(new IgnitePredicate() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_RECONNECTED); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + waitReconnectEvent(reconnectLatch); + + locLsnr.latch = new CountDownLatch(1); + latch = new CountDownLatch(2); + + client.message().send(topic, "msg1"); + + assertTrue(locLsnr.latch.await(5000, MILLISECONDS)); + assertTrue(latch.await(5000, MILLISECONDS)); + + locLsnr.latch = new CountDownLatch(1); + latch = new CountDownLatch(2); + + srv.message().send(topic, "msg2"); + + assertTrue(locLsnr.latch.await(5000, MILLISECONDS)); + assertTrue(latch.await(5000, MILLISECONDS)); + + log.info("Stop listen, should not get remote messages anymore."); + + client.message().stopRemoteListen(opId); + + srv.message().send(topic, "msg3"); + + locLsnr.latch = new CountDownLatch(1); + latch = new CountDownLatch(1); + + assertTrue(locLsnr.latch.await(5000, MILLISECONDS)); + assertFalse(latch.await(3000, MILLISECONDS)); + } + + /** + * @throws Exception If failed. + */ + public void testCacheContinuousQueryReconnect() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + IgniteCache clientCache = client.getOrCreateCache(new CacheConfiguration<>()); + + CacheEventListener lsnr = new CacheEventListener(); + + ContinuousQuery qry = new ContinuousQuery<>(); + + qry.setAutoUnsubscribe(true); + + qry.setLocalListener(lsnr); + + QueryCursor cur = clientCache.query(qry); + + for (int i = 0; i < 5; i++) { + log.info("Iteration: " + i); + + continuousQueryReconnect(client, clientCache, lsnr); + } + + log.info("Close cursor, should not get cache events anymore."); + + cur.close(); + + lsnr.latch = new CountDownLatch(1); + + clientCache.put(3, 3); + + assertFalse(lsnr.latch.await(3000, MILLISECONDS)); + } + + /** + * @param client Client. + * @param clientCache Client cache. + * @param lsnr Continuous query listener. + * @throws Exception If failed. + */ + private void continuousQueryReconnect(Ignite client, + IgniteCache clientCache, + CacheEventListener lsnr) + throws Exception + { + Ignite srv = clientRouter(client); + + TestTcpDiscoverySpi srvSpi = spi(srv); + + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + IgnitePredicate p = new IgnitePredicate() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + reconnectLatch.countDown(); + } + + return true; + } + }; + + client.events().localListen(p, EVT_CLIENT_NODE_RECONNECTED); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + waitReconnectEvent(reconnectLatch); + + client.events().stopLocalListen(p); + + lsnr.latch = new CountDownLatch(1); + + clientCache.put(1, 1); + + assertTrue(lsnr.latch.await(5000, MILLISECONDS)); + + lsnr.latch = new CountDownLatch(1); + + srv.cache(null).put(2, 2); + + assertTrue(lsnr.latch.await(5000, MILLISECONDS)); + } + + /** + * + */ + private static class EventListener implements P2 { + /** */ + private volatile CountDownLatch latch; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public boolean apply(UUID uuid, Event evt) { + assertTrue(ignite.cluster().localNode().isClient()); + + ignite.log().info("Received event: " + evt); + + if (latch != null) + latch.countDown(); + + return true; + } + } + + /** + * + */ + private static class MessageListener implements P2 { + /** */ + private volatile CountDownLatch latch; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public boolean apply(UUID uuid, Object msg) { + assertTrue(ignite.cluster().localNode().isClient()); + + ignite.log().info("Local listener received message: " + msg); + + if (latch != null) + latch.countDown(); + + return true; + } + } + + /** + * + */ + private static class RemoteMessageListener implements P2 { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public boolean apply(UUID uuid, Object msg) { + ignite.log().info("Remote listener received message: " + msg); + + if (latch != null) + latch.countDown(); + + return true; + } + } + + /** + * + */ + private static class CacheEventListener implements CacheEntryUpdatedListener { + /** */ + private volatile CountDownLatch latch; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable> evts) { + int cnt = 0; + + for (CacheEntryEvent evt : evts) { + ignite.log().info("Received cache event: " + evt); + + cnt++; + } + + assertEquals(1, cnt); + + if (latch != null) + latch.countDown(); + } + } + + /** + * + */ + static class DummyJob implements IgniteRunnable { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public void run() { + ignite.log().info("Job run."); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java new file mode 100644 index 0000000..feeebe5 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.events.EventType.*; + +/** + * + */ +public class IgniteClientReconnectDiscoveryStateTest extends IgniteClientReconnectAbstractTest { + /** {@inheritDoc} */ + @Override protected int serverCount() { + return 3; + } + + /** {@inheritDoc} */ + @Override protected int clientCount() { + return 1; + } + + /** + * @throws Exception If failed. + */ + public void testReconnect() throws Exception { + final Ignite client = ignite(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + long topVer = 4; + + IgniteCluster cluster = client.cluster(); + + cluster.nodeLocalMap().put("locMapKey", 10); + + Map nodeCnt = new HashMap<>(); + + nodeCnt.put(1, 1); + nodeCnt.put(2, 2); + nodeCnt.put(3, 3); + nodeCnt.put(4, 4); + + for (Map.Entry e : nodeCnt.entrySet()) { + Collection nodes = cluster.topology(e.getKey()); + + assertNotNull("No nodes for topology: " + e.getKey(), nodes); + assertEquals((int)e.getValue(), nodes.size()); + } + + ClusterNode locNode = cluster.localNode(); + + assertEquals(topVer, locNode.order()); + + TestTcpDiscoverySpi srvSpi = spi(clientRouter(client)); + + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + client.events().localListen(new IgnitePredicate() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + info("Disconnected: " + evt); + + IgniteFuture fut = client.cluster().clientReconnectFuture(); + + assertNotNull(fut); + assertFalse(fut.isDone()); + } + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + waitReconnectEvent(reconnectLatch); + + topVer += 2; // Client failed and rejoined. + + locNode = cluster.localNode(); + + assertEquals(topVer, locNode.order()); + assertEquals(topVer, cluster.topologyVersion()); + + nodeCnt.put(5, 3); + nodeCnt.put(6, 4); + + for (Map.Entry e : nodeCnt.entrySet()) { + Collection nodes = cluster.topology(e.getKey()); + + assertNotNull("No nodes for topology: " + e.getKey(), nodes); + assertEquals((int)e.getValue(), nodes.size()); + } + + assertEquals(10, cluster.nodeLocalMap().get("locMapKey")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java new file mode 100644 index 0000000..1b6523a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.testframework.*; + +import javax.cache.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.events.EventType.*; + +/** + * + */ +public abstract class IgniteClientReconnectFailoverAbstractTest extends IgniteClientReconnectAbstractTest { + /** */ + private static final Integer THREADS = 1; + + /** */ + private volatile CyclicBarrier barrier; + + /** */ + protected static final long TEST_TIME = 90_000; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setPeerClassLoadingEnabled(false); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setJoinTimeout(30_000); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected int serverCount() { + return 3; + } + + /** {@inheritDoc} */ + @Override protected int clientCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return TEST_TIME * 60_000; + } + + /** + * @param c Test closure. + * @throws Exception If failed. + */ + protected final void reconnectFailover(final Callable c) throws Exception { + final Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + TestTcpDiscoverySpi srvSpi = spi(srv); + + final AtomicBoolean stop = new AtomicBoolean(false); + + final IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable() { + @Override public Object call() throws Exception { + try { + int iter = 0; + + while (!stop.get()) { + try { + c.call(); + } + catch (CacheException e) { + checkAndWait(e); + } + catch (IgniteClientDisconnectedException e) { + checkAndWait(e); + } + + if (++iter % 100 == 0) + log.info("Iteration: " + iter); + + if (barrier != null) + barrier.await(); + } + + return null; + } catch (Throwable e) { + log.error("Unexpected error in operation thread: " + e, e); + + stop.set(true); + + throw e; + } + } + }, THREADS, "test-operation-thread"); + + final AtomicReference disconnected = new AtomicReference<>(); + final AtomicReference reconnected = new AtomicReference<>(); + + IgnitePredicate p = new IgnitePredicate() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + CountDownLatch latch = reconnected.get(); + + assertNotNull(latch); + assertEquals(1, latch.getCount()); + + latch.countDown(); + } + else if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + info("Disconnected: " + evt); + + CountDownLatch latch = disconnected.get(); + + assertNotNull(latch); + assertEquals(1, latch.getCount()); + + latch.countDown(); + } + + return true; + } + }; + + client.events().localListen(p, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + try { + long stopTime = System.currentTimeMillis() + TEST_TIME; + + String err = null; + + while (System.currentTimeMillis() < stopTime && !fut.isDone()) { + U.sleep(500); + + CountDownLatch disconnectLatch = new CountDownLatch(1); + CountDownLatch reconnectLatch = new CountDownLatch(1); + + disconnected.set(disconnectLatch); + reconnected.set(reconnectLatch); + + UUID nodeId = client.cluster().localNode().id(); + + log.info("Fail client: " + nodeId); + + srvSpi.failNode(nodeId, null); + + if (!disconnectLatch.await(10_000, MILLISECONDS)) { + err = "Failed to wait for disconnect"; + + break; + } + + if (!reconnectLatch.await(10_000, MILLISECONDS)) { + err = "Failed to wait for reconnect"; + + break; + } + + barrier = new CyclicBarrier(THREADS + 1, new Runnable() { + @Override public void run() { + barrier = null; + } + }); + + try { + barrier.await(10, SECONDS); + } + catch (TimeoutException e) { + err = "Operations hang or fail with unexpected error."; + + break; + } + } + + if (err != null) { + log.error(err); + + U.dumpThreads(log); + + CyclicBarrier barrier0 = barrier; + + if (barrier0 != null) + barrier0.reset(); + + stop.set(true); + + fut.get(); + + fail(err); + } + + stop.set(true); + + fut.get(); + } + finally { + client.events().stopLocalListen(p); + + stop.set(true); + } + } +}