Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C646217BFF for ; Mon, 2 Nov 2015 10:44:56 +0000 (UTC) Received: (qmail 71518 invoked by uid 500); 2 Nov 2015 10:44:56 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 71474 invoked by uid 500); 2 Nov 2015 10:44:56 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 71395 invoked by uid 99); 2 Nov 2015 10:44:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Nov 2015 10:44:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 503C8DFE1B; Mon, 2 Nov 2015 10:44:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yzhdanov@apache.org To: commits@ignite.apache.org Date: Mon, 02 Nov 2015 10:44:56 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/6] ignite git commit: ignite-1758 Fixed issues with client reconnect handling Repository: ignite Updated Branches: refs/heads/ignite-1.4-slow-server-debug 0ae2441d4 -> b811c8da0 http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java index 1ccbe1f..09b3ef8 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java @@ -17,6 +17,9 @@ package org.apache.ignite.spi.discovery.tcp; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CyclicBarrier; @@ -24,18 +27,30 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.events.Event; +import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.client.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import static org.apache.ignite.events.EventType.EVT_JOB_MAPPED; +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.events.EventType.EVT_TASK_FAILED; import static org.apache.ignite.events.EventType.EVT_TASK_FINISHED; @@ -53,8 +68,14 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { private static final ThreadLocal clientFlagPerThread = new ThreadLocal<>(); /** */ + private static final ThreadLocal nodeId = new ThreadLocal<>(); + + /** */ private static volatile boolean clientFlagGlobal; + /** */ + private static GridConcurrentHashSet failedNodes = new GridConcurrentHashSet<>(); + /** * @return Client node flag. */ @@ -79,10 +100,37 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); + UUID id = nodeId.get(); + + if (id != null) { + cfg.setNodeId(id); + + nodeId.set(null); + } + if (client()) cfg.setClientMode(true); - cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(ipFinder)); + cfg.setDiscoverySpi(new TcpDiscoverySpi(). + setIpFinder(ipFinder). + setJoinTimeout(60_000). + setNetworkTimeout(10_000)); + + int[] evts = {EVT_NODE_FAILED, EVT_NODE_LEFT}; + + Map, int[]> lsnrs = new HashMap<>(); + + lsnrs.put(new IgnitePredicate() { + @Override public boolean apply(Event evt) { + DiscoveryEvent discoveryEvt = (DiscoveryEvent)evt; + + failedNodes.add(discoveryEvt.eventNode().id()); + + return true; + } + }, evts); + + cfg.setLocalEventListeners(lsnrs); cfg.setCacheConfiguration(); @@ -90,6 +138,8 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { cfg.setIncludeProperties(); + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + return cfg; } @@ -98,6 +148,8 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { stopAllGrids(); super.afterTest(); + + failedNodes.clear(); } /** {@inheritDoc} */ @@ -111,114 +163,215 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { public void testMultiThreadedClientsRestart() throws Exception { fail("https://issues.apache.org/jira/browse/IGNITE-1123"); - clientFlagGlobal = false; + final AtomicBoolean done = new AtomicBoolean(); - info("Test timeout: " + (getTestTimeout() / (60 * 1000)) + " min."); + try { + clientFlagGlobal = false; - startGridsMultiThreaded(GRID_CNT); + info("Test timeout: " + (getTestTimeout() / (60 * 1000)) + " min."); - clientFlagGlobal = true; + startGridsMultiThreaded(GRID_CNT); - startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT); + clientFlagGlobal = true; - final AtomicBoolean done = new AtomicBoolean(); + startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT); - final AtomicInteger clientIdx = new AtomicInteger(GRID_CNT); + final AtomicInteger clientIdx = new AtomicInteger(GRID_CNT); - IgniteInternalFuture fut1 = multithreadedAsync( - new Callable() { - @Override public Object call() throws Exception { - clientFlagPerThread.set(true); + IgniteInternalFuture fut1 = multithreadedAsync( + new Callable() { + @Override public Object call() throws Exception { + clientFlagPerThread.set(true); - int idx = clientIdx.getAndIncrement(); + int idx = clientIdx.getAndIncrement(); - while (!done.get()) { - stopGrid(idx, true); - startGrid(idx); - } + while (!done.get()) { + stopGrid(idx, true); + startGrid(idx); + } - return null; - } - }, - CLIENT_GRID_CNT - ); + return null; + } + }, + CLIENT_GRID_CNT, + "client-restart"); - Thread.sleep(getTestTimeout() - 60 * 1000); + Thread.sleep(getTestTimeout() - 60 * 1000); - done.set(true); + done.set(true); - fut1.get(); + fut1.get(); + } + finally { + done.set(true); + } } /** * @throws Exception If any error occurs. */ - public void testMultiThreadedClientsServersRestart() throws Exception { + public void testMultiThreadedClientsServersRestart() throws Throwable { fail("https://issues.apache.org/jira/browse/IGNITE-1123"); - clientFlagGlobal = false; + final AtomicBoolean done = new AtomicBoolean(); + + try { + clientFlagGlobal = false; - info("Test timeout: " + (getTestTimeout() / (60 * 1000)) + " min."); + info("Test timeout: " + (getTestTimeout() / (60 * 1000)) + " min."); - startGridsMultiThreaded(GRID_CNT); + startGridsMultiThreaded(GRID_CNT); - clientFlagGlobal = true; + clientFlagGlobal = true; - startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT); + startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT); - final AtomicBoolean done = new AtomicBoolean(); + final AtomicReference error = new AtomicReference<>(); - final AtomicInteger clientIdx = new AtomicInteger(GRID_CNT); + final BlockingQueue clientStopIdxs = new LinkedBlockingQueue<>(); - IgniteInternalFuture fut1 = multithreadedAsync( - new Callable() { - @Override public Object call() throws Exception { - clientFlagPerThread.set(true); + for (int i = GRID_CNT; i < GRID_CNT + CLIENT_GRID_CNT; i++) + clientStopIdxs.add(i); - int idx = clientIdx.getAndIncrement(); + final AtomicInteger clientStartIdx = new AtomicInteger(9000); - while (!done.get()) { - stopGrid(idx); - startGrid(idx); + IgniteInternalFuture fut1 = multithreadedAsync( + new Callable() { + @Override public Object call() throws Exception { + try { + clientFlagPerThread.set(true); + + while (!done.get() && error.get() == null) { + Integer stopIdx = clientStopIdxs.take(); + + log.info("Stop client: " + stopIdx); + + stopGrid(stopIdx); + + while (!done.get() && error.get() == null) { + // Generate unique name to simplify debugging. + int startIdx = clientStartIdx.getAndIncrement(); + + log.info("Start client: " + startIdx); + + UUID id = UUID.randomUUID(); + + nodeId.set(id); + + try { + Ignite ignite = startGrid(startIdx); + + assertTrue(ignite.configuration().isClientMode()); + + clientStopIdxs.add(startIdx); + + break; + } + catch (Exception e) { + if (X.hasCause(e, IgniteClientDisconnectedCheckedException.class) || + X.hasCause(e, IgniteClientDisconnectedException.class)) + log.info("Client disconnected: " + e); + else { + if (failedNodes.contains(id) && X.hasCause(e, IgniteSpiException.class)) + log.info("Client failed: " + e); + else + throw e; + } + } + } + } + } + catch (Throwable e) { + log.error("Unexpected error: " + e, e); + + error.compareAndSet(null, e); + + return null; + } + + return null; } + }, + CLIENT_GRID_CNT, + "client-restart"); - return null; - } - }, - CLIENT_GRID_CNT - ); + final BlockingQueue srvStopIdxs = new LinkedBlockingQueue<>(); - final BlockingQueue srvIdx = new LinkedBlockingQueue<>(); + for (int i = 0; i < GRID_CNT; i++) + srvStopIdxs.add(i); - for (int i = 0; i < GRID_CNT; i++) - srvIdx.add(i); + final AtomicInteger srvStartIdx = new AtomicInteger(GRID_CNT + CLIENT_GRID_CNT); - IgniteInternalFuture fut2 = multithreadedAsync( - new Callable() { - @Override public Object call() throws Exception { - clientFlagPerThread.set(false); + IgniteInternalFuture fut2 = multithreadedAsync( + new Callable() { + @Override public Object call() throws Exception { + try { + clientFlagPerThread.set(false); + + while (!done.get() && error.get() == null) { + int stopIdx = srvStopIdxs.take(); - while (!done.get()) { - int idx = srvIdx.take(); + log.info("Stop server: " + stopIdx); - stopGrid(idx); - startGrid(idx); + stopGrid(stopIdx); - srvIdx.add(idx); + // Generate unique name to simplify debugging. + int startIdx = srvStartIdx.getAndIncrement(); + + log.info("Start server: " + startIdx); + + Ignite ignite = startGrid(startIdx); + + assertFalse(ignite.configuration().isClientMode()); + + srvStopIdxs.add(startIdx); + } + } + catch (Throwable e) { + log.error("Unexpected error: " + e, e); + + error.compareAndSet(null, e); + + return null; + } + + return null; } + }, + GRID_CNT - 1, + "server-restart"); - return null; + final long timeToExec = getTestTimeout() - 60_000; + + final long endTime = System.currentTimeMillis() + timeToExec; + + while (System.currentTimeMillis() < endTime) { + Thread.sleep(3000); + + if (error.get() != null) { + Throwable err = error.get(); + + U.error(log, "Test failed: " + err.getMessage()); + + done.set(true); + + fut1.cancel(); + fut2.cancel(); + + throw err; } - }, - GRID_CNT - 1 - ); + } - Thread.sleep(getTestTimeout() - 60 * 1000); + log.info("Stop test."); - done.set(true); + done.set(true); - fut1.get(); - fut2.get(); + fut1.get(); + fut2.get(); + } + finally { + done.set(true); + } } /**