Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2D2C2183B9 for ; Tue, 16 Jun 2015 06:44:52 +0000 (UTC) Received: (qmail 19370 invoked by uid 500); 16 Jun 2015 06:44:52 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 19338 invoked by uid 500); 16 Jun 2015 06:44:52 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 19329 invoked by uid 99); 16 Jun 2015 06:44:52 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Jun 2015 06:44:52 +0000 X-ASF-Spam-Status: No, hits=-2000.4 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 16 Jun 2015 06:42:36 +0000 Received: (qmail 18774 invoked by uid 99); 16 Jun 2015 06:44:23 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Jun 2015 06:44:23 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CF8B5E35DF; Tue, 16 Jun 2015 06:44:22 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Tue, 16 Jun 2015 06:44:22 -0000 Message-Id: <04ffd0d5910e4b2fac4315f2e1bba2fa@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/9] incubator-ignite git commit: # ignite-883 X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-ignite Updated Branches: refs/heads/ignite-sprint-6 090733868 -> 5d8a5e619 # ignite-883 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/de0d61f6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/de0d61f6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/de0d61f6 Branch: refs/heads/ignite-sprint-6 Commit: de0d61f6a3467379c168a15bbd9c4580aeebb092 Parents: 89a4f7c Author: sboikov Authored: Thu Jun 11 10:40:27 2015 +0300 Committer: sboikov Committed: Thu Jun 11 17:40:01 2015 +0300 ---------------------------------------------------------------------- .../apache/ignite/internal/IgniteKernal.java | 1 - .../discovery/GridDiscoveryManager.java | 9 +- .../dht/preloader/GridDhtPreloader.java | 2 +- .../org/apache/ignite/spi/IgniteSpiAdapter.java | 28 ++- .../communication/tcp/TcpCommunicationSpi.java | 2 +- .../ignite/spi/discovery/tcp/ClientImpl.java | 214 ++++++++++++------- .../ignite/spi/discovery/tcp/ServerImpl.java | 64 ------ .../spi/discovery/tcp/TcpDiscoveryImpl.java | 66 ++++++ .../distributed/IgniteCacheManyClientsTest.java | 87 +++++++- 9 files changed, 323 insertions(+), 150 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/de0d61f6/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 4f5e365..f38fee1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -1001,7 +1001,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { A.notNull(cfg.getMBeanServer(), "cfg.getMBeanServer()"); A.notNull(cfg.getGridLogger(), "cfg.getGridLogger()"); A.notNull(cfg.getMarshaller(), "cfg.getMarshaller()"); - A.notNull(cfg.getPublicThreadPoolSize(), "cfg.getPublicThreadPoolSize()"); A.notNull(cfg.getUserAttributes(), "cfg.getUserAttributes()"); // All SPIs should be non-null. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/de0d61f6/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 71fbc61..464110c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -2057,17 +2057,22 @@ public class GridDiscoveryManager extends GridManagerAdapter { private final AffinityTopologyVersion topVer; /** */ + @GridToStringExclude private final DiscoCache discoCache; /** * @param topVer Topology version. * @param discoCache Disco cache. */ - private Snapshot(AffinityTopologyVersion topVer, - DiscoCache discoCache) { + private Snapshot(AffinityTopologyVersion topVer, DiscoCache discoCache) { this.topVer = topVer; this.discoCache = discoCache; } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(Snapshot.class, this); + } } /** Cache for discovery collections. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/de0d61f6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 51010ce..0355bb3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -102,7 +102,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { boolean set = topVer.setIfGreater(e.topologyVersion()); assert set : "Have you configured TcpDiscoverySpi for your in-memory data grid? [newVer=" + - e.topologyVersion() + ", curVer=" + topVer.get() + ']'; + e.topologyVersion() + ", curVer=" + topVer.get() + ", evt=" + e + ']'; if (e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED) { for (GridDhtAssignmentFetchFuture fut : pendingAssignmentFetchFuts.values()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/de0d61f6/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java index 6e7a706..476f8a8 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java @@ -585,8 +585,32 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement GridDummySpiContext(ClusterNode locNode, boolean stopping, @Nullable IgniteSpiContext spiCtx) { this.locNode = locNode; this.stopping = stopping; - this.msgFactory = spiCtx != null ? spiCtx.messageFactory() : null; - this.msgFormatter = spiCtx != null ? spiCtx.messageFormatter() : null; + + MessageFactory msgFactory0 = spiCtx != null ? spiCtx.messageFactory() : null; + MessageFormatter msgFormatter0 = spiCtx != null ? spiCtx.messageFormatter() : null; + + if (msgFactory0 == null) { + msgFactory0 = new MessageFactory() { + @Nullable @Override public Message create(byte type) { + throw new IgniteException("Failed to read message, node is not started."); + } + }; + } + + if (msgFormatter0 == null) { + msgFormatter0 = new MessageFormatter() { + @Override public MessageWriter writer() { + throw new IgniteException("Failed to write message, node is not started."); + } + + @Override public MessageReader reader(MessageFactory factory) { + throw new IgniteException("Failed to read message, node is not started."); + } + }; + } + + this.msgFactory = msgFactory0; + this.msgFormatter = msgFormatter0; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/de0d61f6/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index a661965..f19e25b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -2028,7 +2028,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (X.hasCause(e, SocketTimeoutException.class)) LT.warn(log, null, "Connect timed out (consider increasing 'connTimeout' " + - "configuration property) [addr=" + addr + ']'); + "configuration property) [addr=" + addr + ", connTimeout" + connTimeout + ']'); if (errs == null) errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/de0d61f6/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index d064c8d..23297ed 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -195,7 +195,7 @@ class ClientImpl extends TcpDiscoveryImpl { U.warn(log, "Failed to left node: timeout [nodeId=" + locNode + ']'); } catch (InterruptedException ignored) { - + // No-op. } } @@ -282,7 +282,7 @@ class ClientImpl extends TcpDiscoveryImpl { return false; } catch (IgniteCheckedException e) { - throw new IgniteSpiException(e); // Should newer occur + throw new IgniteSpiException(e); // Should newer occur. } } @@ -347,7 +347,10 @@ class ClientImpl extends TcpDiscoveryImpl { } /** + * @param recon {@code True} if reconnects. * @return Opened socket or {@code null} if timeout. + * @throws InterruptedException If interrupted. + * @throws IgniteSpiException If failed. * @see TcpDiscoverySpi#joinTimeout */ @SuppressWarnings("BusyWait") @@ -387,71 +390,152 @@ class ClientImpl extends TcpDiscoveryImpl { InetSocketAddress addr = it.next(); - Socket sock = null; + T2 sockAndRes = sendJoinRequest(recon, addr); - try { - long ts = U.currentTimeMillis(); + if (sockAndRes == null) { + it.remove(); - IgniteBiTuple t = initConnection(addr); + continue; + } - sock = t.get1(); + assert sockAndRes.get1() != null; + assert sockAndRes.get2() != null; - UUID rmtNodeId = t.get2(); + Socket sock = sockAndRes.get1(); - spi.stats.onClientSocketInitialized(U.currentTimeMillis() - ts); + switch (sockAndRes.get2()) { + case RES_OK: + return sock; - locNode.clientRouterNodeId(rmtNodeId); + case RES_CONTINUE_JOIN: + case RES_WAIT: + U.closeQuiet(sock); - TcpDiscoveryAbstractMessage msg = recon ? - new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId, - lastMsgId) : - new TcpDiscoveryJoinRequestMessage(locNode, spi.collectExchangeData(getLocalNodeId())); + break; - msg.client(true); + default: + if (log.isDebugEnabled()) + log.debug("Received unexpected response to join request: " + sockAndRes.get2()); - spi.writeToSocket(sock, msg); + U.closeQuiet(sock); + } + } - int res = spi.readReceipt(sock, spi.ackTimeout); + if (addrs.isEmpty()) { + if (spi.joinTimeout > 0 && (U.currentTimeMillis() - startTime) > spi.joinTimeout) + return null; - switch (res) { - case RES_OK: - return sock; + Thread.sleep(2000); - case RES_CONTINUE_JOIN: - case RES_WAIT: - U.closeQuiet(sock); + U.warn(log, "Failed to connect to any address from IP finder (will retry to join topology " + + "in 2000ms): " + addrs0); + } + } + } - break; + /** + * @param recon {@code True} if reconnects. + * @param addr Address. + * @return Socket and connect response. + */ + @Nullable private T2 sendJoinRequest(boolean recon, InetSocketAddress addr) { + assert addr != null; - default: - if (log.isDebugEnabled()) - log.debug("Received unexpected response to join request: " + res); + Collection errs = null; - U.closeQuiet(sock); - } - } - catch (IOException | IgniteCheckedException e) { - if (log.isDebugEnabled()) - U.error(log, "Failed to establish connection with address: " + addr, e); + long ackTimeout0 = spi.ackTimeout; - U.closeQuiet(sock); + int connectAttempts = 1; - it.remove(); - } + UUID locNodeId = getLocalNodeId(); + + for (int i = 0; i < spi.reconCnt; i++) { + boolean openSock = false; + + Socket sock = null; + + try { + long tstamp = U.currentTimeMillis(); + + sock = spi.openSocket(addr); + + openSock = true; + + TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(locNodeId); + + req.client(true); + + spi.writeToSocket(sock, req); + + TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0); + + UUID rmtNodeId = res.creatorNodeId(); + + assert rmtNodeId != null; + assert !getLocalNodeId().equals(rmtNodeId); + + spi.stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp); + + locNode.clientRouterNodeId(rmtNodeId); + + tstamp = U.currentTimeMillis(); + + TcpDiscoveryAbstractMessage msg = recon ? + new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId, lastMsgId) : + new TcpDiscoveryJoinRequestMessage(locNode, spi.collectExchangeData(getLocalNodeId())); + + msg.client(true); + + spi.writeToSocket(sock, msg); + + spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp); + + if (log.isDebugEnabled()) + log.debug("Message has been sent to address [msg=" + msg + ", addr=" + addr + + ", rmtNodeId=" + rmtNodeId + ']'); + + return new T2<>(sock, spi.readReceipt(sock, ackTimeout0)); } + catch (IOException | IgniteCheckedException e) { + U.closeQuiet(sock); - if (addrs.isEmpty()) { - U.warn(log, "Failed to connect to any address from IP finder (will retry to join topology " + - "in 2000ms): " + addrs0); + if (log.isDebugEnabled()) + log.error("Exception on joining: " + e.getMessage(), e); - if (spi.joinTimeout > 0 && (U.currentTimeMillis() - startTime) > spi.joinTimeout) - return null; + onException("Exception on joining: " + e.getMessage(), e); - Thread.sleep(2000); + if (errs == null) + errs = new ArrayList<>(); + + errs.add(e); + + if (!openSock) { + // Reconnect for the second time, if connection is not established. + if (connectAttempts < 2) { + connectAttempts++; + + continue; + } + + break; // Don't retry if we can not establish connection. + } + + if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) { + ackTimeout0 *= 2; + + if (!checkAckTimeout(ackTimeout0)) + break; + } } } + + if (log.isDebugEnabled()) + log.debug("Failed to join to address [addr=" + addr + ", recon=" + recon + ", errs=" + errs + ']'); + + return null; } + /** * @param topVer New topology version. * @return Latest topology snapshot. @@ -493,33 +577,6 @@ class ClientImpl extends TcpDiscoveryImpl { return allNodes; } - /** - * @param addr Address. - * @return Remote node ID. - * @throws IOException In case of I/O error. - * @throws IgniteCheckedException In case of other error. - */ - private IgniteBiTuple initConnection(InetSocketAddress addr) throws IOException, IgniteCheckedException { - assert addr != null; - - Socket sock = spi.openSocket(addr); - - TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(getLocalNodeId()); - - req.client(true); - - spi.writeToSocket(sock, req); - - TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, spi.ackTimeout); - - UUID nodeId = res.creatorNodeId(); - - assert nodeId != null; - assert !getLocalNodeId().equals(nodeId); - - return F.t(sock, nodeId); - } - /** {@inheritDoc} */ @Override void simulateNodeFailure() { U.warn(log, "Simulating client node failure: " + getLocalNodeId()); @@ -736,7 +793,7 @@ class ClientImpl extends TcpDiscoveryImpl { } /** - * + * @return {@code True} if connection is alive. */ public boolean isOnline() { synchronized (mux) { @@ -780,7 +837,8 @@ class ClientImpl extends TcpDiscoveryImpl { } catch (IOException e) { if (log.isDebugEnabled()) - U.error(log, "Failed to send node left message (will stop anyway) [sock=" + sock + ']', e); + U.error(log, "Failed to send node left message (will stop anyway) " + + "[sock=" + sock + ", msg=" + msg + ']', e); U.closeQuiet(sock); @@ -909,7 +967,7 @@ class ClientImpl extends TcpDiscoveryImpl { final Socket sock = joinTopology(false); if (sock == null) { - joinErr = new IgniteSpiException("Join process timed out"); + joinErr = new IgniteSpiException("Join process timed out."); joinLatch.countDown(); @@ -934,8 +992,9 @@ class ClientImpl extends TcpDiscoveryImpl { if (msg == JOIN_TIMEOUT) { if (joinLatch.getCount() > 0) { - joinErr = new IgniteSpiException("Join process timed out [sock=" + sock + - ", timeout=" + spi.netTimeout + ']'); + joinErr = new IgniteSpiException("Join process timed out, did not receive response for " + + "join request (consider increasing 'networkTimeout' configuration property) " + + "[networkTimeout=" + spi.netTimeout + ", sock=" + sock +']'); joinLatch.countDown(); @@ -1027,7 +1086,7 @@ class ClientImpl extends TcpDiscoveryImpl { if (joinLatch.getCount() > 0) { // This should not occurs. - joinErr = new IgniteSpiException("Some error occurs in joinig process"); + joinErr = new IgniteSpiException("Some error occur in join process."); joinLatch.countDown(); } @@ -1236,8 +1295,9 @@ class ClientImpl extends TcpDiscoveryImpl { if (spi.getSpiContext().isStopping()) { if (!getLocalNodeId().equals(msg.creatorNodeId()) && getLocalNodeId().equals(msg.failedNodeId())) { if (leaveLatch.getCount() > 0) { - log.debug("Remote node fail this node while node is stopping [locNode=" + getLocalNodeId() - + ", rmtNode=" + msg.creatorNodeId() + ']'); + if (log.isDebugEnabled()) + log.debug("Remote node fail this node while node is stopping [locNode=" + getLocalNodeId() + + ", rmtNode=" + msg.creatorNodeId() + ']'); leaveLatch.countDown(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/de0d61f6/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 5aceaae..311c783 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -125,16 +125,6 @@ class ServerImpl extends TcpDiscoveryImpl { private final ConcurrentMap>> pingMap = new ConcurrentHashMap8<>(); - /** Debug mode. */ - private boolean debugMode; - - /** Debug messages history. */ - private int debugMsgHist = 512; - - /** Received messages. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private ConcurrentLinkedDeque debugLog; - /** * @param adapter Adapter. */ @@ -142,24 +132,6 @@ class ServerImpl extends TcpDiscoveryImpl { super(adapter); } - /** - * This method is intended for troubleshooting purposes only. - * - * @param debugMode {code True} to start SPI in debug mode. - */ - public void setDebugMode(boolean debugMode) { - this.debugMode = debugMode; - } - - /** - * This method is intended for troubleshooting purposes only. - * - * @param debugMsgHist Message history log size. - */ - public void setDebugMessageHistory(int debugMsgHist) { - this.debugMsgHist = debugMsgHist; - } - /** {@inheritDoc} */ @Override public String getSpiState() { synchronized (mux) { @@ -1060,23 +1032,6 @@ class ServerImpl extends TcpDiscoveryImpl { } /** - * @param ackTimeout Acknowledgement timeout. - * @return {@code True} if acknowledgement timeout is less or equal to - * maximum acknowledgement timeout, {@code false} otherwise. - */ - private boolean checkAckTimeout(long ackTimeout) { - if (ackTimeout > spi.maxAckTimeout) { - LT.warn(log, null, "Acknowledgement timeout is greater than maximum acknowledgement timeout " + - "(consider increasing 'maxAckTimeout' configuration property) " + - "[ackTimeout=" + ackTimeout + ", maxAckTimeout=" + spi.maxAckTimeout + ']'); - - return false; - } - - return true; - } - - /** * Notify external listener on discovery event. * * @param type Discovery event type. See {@link org.apache.ignite.events.DiscoveryEvent} for more details. @@ -1422,25 +1377,6 @@ class ServerImpl extends TcpDiscoveryImpl { /** * @param msg Message. - */ - private void debugLog(String msg) { - assert debugMode; - - String msg0 = new SimpleDateFormat("[HH:mm:ss,SSS]").format(new Date(System.currentTimeMillis())) + - '[' + Thread.currentThread().getName() + "][" + getLocalNodeId() + - "-" + locNode.internalOrder() + "] " + - msg; - - debugLog.add(msg0); - - int delta = debugLog.size() - debugMsgHist; - - for (int i = 0; i < delta && debugLog.size() > debugMsgHist; i++) - debugLog.poll(); - } - - /** - * @param msg Message. * @return {@code True} if recordable in debug mode. */ private boolean recordable(TcpDiscoveryAbstractMessage msg) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/de0d61f6/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java index b7e9e53..94097c9 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java @@ -26,7 +26,9 @@ import org.apache.ignite.spi.discovery.*; import org.apache.ignite.spi.discovery.tcp.internal.*; import org.jetbrains.annotations.*; +import java.text.*; import java.util.*; +import java.util.concurrent.*; /** * @@ -50,6 +52,16 @@ abstract class TcpDiscoveryImpl { /** */ protected TcpDiscoveryNode locNode; + /** Debug mode. */ + protected boolean debugMode; + + /** Debug messages history. */ + private int debugMsgHist = 512; + + /** Received messages. */ + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + protected ConcurrentLinkedDeque debugLog; + /** * @param spi Adapter. */ @@ -60,6 +72,43 @@ abstract class TcpDiscoveryImpl { } /** + * This method is intended for troubleshooting purposes only. + * + * @param debugMode {code True} to start SPI in debug mode. + */ + public void setDebugMode(boolean debugMode) { + this.debugMode = debugMode; + } + + /** + * This method is intended for troubleshooting purposes only. + * + * @param debugMsgHist Message history log size. + */ + public void setDebugMessageHistory(int debugMsgHist) { + this.debugMsgHist = debugMsgHist; + } + + /** + * @param msg Message. + */ + protected void debugLog(String msg) { + assert debugMode; + + String msg0 = new SimpleDateFormat("[HH:mm:ss,SSS]").format(new Date(System.currentTimeMillis())) + + '[' + Thread.currentThread().getName() + "][" + getLocalNodeId() + + "-" + locNode.internalOrder() + "] " + + msg; + + debugLog.add(msg0); + + int delta = debugLog.size() - debugMsgHist; + + for (int i = 0; i < delta && debugLog.size() > debugMsgHist; i++) + debugLog.poll(); + } + + /** * @return Local node ID. */ public UUID getLocalNodeId() { @@ -209,4 +258,21 @@ abstract class TcpDiscoveryImpl { } } } + + /** + * @param ackTimeout Acknowledgement timeout. + * @return {@code True} if acknowledgement timeout is less or equal to + * maximum acknowledgement timeout, {@code false} otherwise. + */ + protected boolean checkAckTimeout(long ackTimeout) { + if (ackTimeout > spi.maxAckTimeout) { + LT.warn(log, null, "Acknowledgement timeout is greater than maximum acknowledgement timeout " + + "(consider increasing 'maxAckTimeout' configuration property) " + + "[ackTimeout=" + ackTimeout + ", maxAckTimeout=" + spi.maxAckTimeout + ']'); + + return false; + } + + return true; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/de0d61f6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java index 24ebb7c..77ddd40 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java @@ -18,15 +18,17 @@ package org.apache.ignite.internal.processors.cache.distributed; import org.apache.ignite.*; -import org.apache.ignite.cache.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.spi.communication.tcp.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import org.apache.ignite.testframework.*; import org.apache.ignite.testframework.junits.common.*; +import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -54,6 +56,12 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest { @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); + cfg.setConnectorConfiguration(null); + cfg.setPeerClassLoadingEnabled(false); + cfg.setTimeServerPortRange(200); + + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setLocalPortRange(200); + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); if (!clientDiscovery) @@ -61,6 +69,12 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest { cfg.setClientMode(client); + if (client) { +// cfg.setPublicThreadPoolSize(1); +// cfg.setPeerClassLoadingThreadPoolSize(1); +// cfg.setIgfsThreadPoolSize(1); + } + CacheConfiguration ccfg = new CacheConfiguration(); ccfg.setCacheMode(PARTITIONED); @@ -85,6 +99,11 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest { stopAllGrids(); } + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 10 * 60_000; + } + /** * @throws Exception If failed. */ @@ -104,6 +123,66 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testManyClientsSequentiallyClientDiscovery() throws Exception { + clientDiscovery = true; + + manyClientsSequentially(); + } + + /** + * @throws Exception If failed. + */ + private void manyClientsSequentially() throws Exception { + client = true; + + List clients = new ArrayList<>(); + + final int CLIENTS = 50; + + int idx = SRVS; + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (int i = 0; i < CLIENTS; i++) { + Ignite ignite = startGrid(idx++); + + log.info("Started node: " + ignite.name()); + + assertTrue(ignite.configuration().isClientMode()); + + clients.add(ignite); + + IgniteCache cache = ignite.cache(null); + + Integer key = rnd.nextInt(0, 1000); + + cache.put(key, i); + + assertNotNull(cache.get(key)); + } + + log.info("All clients started."); + + assertEquals(SRVS + CLIENTS, G.allGrids().size()); + + long topVer = -1L; + + for (Ignite ignite : G.allGrids()) { + assertEquals(SRVS + CLIENTS, ignite.cluster().nodes().size()); + + if (topVer == -1L) + topVer = ignite.cluster().topologyVersion(); + else + assertEquals(topVer, ignite.cluster().topologyVersion()); + } + + for (Ignite client : clients) + client.close(); + } + + /** + * @throws Exception If failed. + */ private void manyClientsPutGet() throws Exception { client = true; @@ -111,7 +190,7 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest { final AtomicBoolean stop = new AtomicBoolean(); - final int THREADS = 30; + final int THREADS = 50; final CountDownLatch latch = new CountDownLatch(THREADS); @@ -143,6 +222,8 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest { cache.put(key, iter++); assertNotNull(cache.get(key)); + + Thread.sleep(1); } log.info("Stopping node: " + ignite.name()); @@ -154,6 +235,8 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest { latch.await(); + log.info("All clients started."); + Thread.sleep(10_000); log.info("Stop clients.");