Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 90BB2200BD1 for ; Mon, 28 Nov 2016 11:53:53 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 8F704160B0D; Mon, 28 Nov 2016 10:53:53 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3F1CE160B06 for ; Mon, 28 Nov 2016 11:53:52 +0100 (CET) Received: (qmail 8937 invoked by uid 500); 28 Nov 2016 10:53:51 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 8925 invoked by uid 99); 28 Nov 2016 10:53:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 28 Nov 2016 10:53:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4376CE04BB; Mon, 28 Nov 2016 10:53:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Message-Id: <9f9f8772fd7b4787a475c28f6c563392@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: ignite-4296-1 Date: Mon, 28 Nov 2016 10:53:51 +0000 (UTC) archived-at: Mon, 28 Nov 2016 10:53:53 -0000 Repository: ignite Updated Branches: refs/heads/ignite-4296-1 [created] 39532a3ee ignite-4296-1 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/39532a3e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/39532a3e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/39532a3e Branch: refs/heads/ignite-4296-1 Commit: 39532a3eed0c4f37052f8e134ca9f71c9c82d348 Parents: 460a262 Author: sboikov Authored: Mon Nov 28 13:01:49 2016 +0300 Committer: sboikov Committed: Mon Nov 28 13:01:49 2016 +0300 ---------------------------------------------------------------------- .../discovery/GridDiscoveryManager.java | 20 +++ .../cache/CacheAffinitySharedManager.java | 8 ++ .../GridCachePartitionExchangeManager.java | 18 +++ .../GridDhtPartitionsExchangeFuture.java | 104 +++++++++++++- .../ignite/spi/discovery/tcp/ServerImpl.java | 142 ++++++++++++++++--- .../tcp/internal/TcpDiscoveryStatistics.java | 12 +- .../TcpDiscoveryJoinRequestMessage.java | 13 ++ 7 files changed, 292 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/39532a3e/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 488dabe..662fa68 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -1621,6 +1621,15 @@ public class GridDiscoveryManager extends GridManagerAdapter { } /** + * @param topVer Topology version. + * @param node Node. + * @return DHT caches started on given node. + */ + public Collection nodeCaches(AffinityTopologyVersion topVer, ClusterNode node) { + return resolveDiscoCache(null, topVer).nodeCaches(node); + } + + /** * Gets cache remote nodes for cache with given name. * * @param topVer Topology version. @@ -2692,6 +2701,17 @@ public class GridDiscoveryManager extends GridManagerAdapter { this.nodeMap = nodeMap; } + Collection nodeCaches(ClusterNode node) { + List res = new ArrayList<>(); + + for (Map.Entry> e : affCacheNodes.entrySet()) { + if (F.contains(e.getValue(), node)) + res.add(e.getKey()); + } + + return res; + } + /** * Adds node to map. * http://git-wip-us.apache.org/repos/asf/ignite/blob/39532a3e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 2890887..77563ae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -1517,6 +1517,14 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap return assignment; } + public GridAffinityAssignmentCache cacheAssignment(Integer cacheId) { + CacheHolder holder = caches.get(cacheId); + + assert holder != null; + + return holder.affinity(); + } + /** * */ http://git-wip-us.apache.org/repos/asf/ignite/blob/39532a3e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 7a24aa1..2d2f738 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -102,6 +102,7 @@ import org.jsr166.ConcurrentLinkedDeque8; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.ignite.IgniteSystemProperties.IGNITE_PRELOAD_RESEND_TIMEOUT; import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT; +import static org.apache.ignite.IgniteSystemProperties.getBoolean; import static org.apache.ignite.IgniteSystemProperties.getLong; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; @@ -119,6 +120,9 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana /** Exchange history size. */ private static final int EXCHANGE_HISTORY_SIZE = 1000; + /** */ + private boolean skipFirstExchangeMsg; + /** Atomic reference for pending timeout object. */ private AtomicReference pendingResend = new AtomicReference<>(); @@ -300,10 +304,24 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana } }; + public boolean skipFirstExchangeMessage() { + return skipFirstExchangeMsg; + } + /** {@inheritDoc} */ @Override protected void start0() throws IgniteCheckedException { super.start0(); + if (getBoolean("SKIP_FIRST_EXCHANGE_MSG", false)) { + if (cctx.kernalContext().config().isLateAffinityAssignment()) { + skipFirstExchangeMsg = true; + + cctx.kernalContext().addNodeAttribute("SKIP_FIRST_EXCHANGE_MSG", true); + } + else + U.warn(log, "Can not use SKIP_FIRST_EXCHANGE_MSG optimization when late affinity assignment disabled."); + } + exchWorker = new ExchangeWorker(); cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED, http://git-wip-us.apache.org/repos/asf/ignite/blob/39532a3e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index e945de9..4e269cc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -81,6 +81,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING; /** * Future for exchanging partition maps. @@ -115,6 +116,15 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter srvNodes; @@ -435,6 +445,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter caches = cctx.discovery().nodeCaches(topologyVersion(), node); + + if (!caches.isEmpty()) { + GridDhtPartitionsSingleMessage msg = new GridDhtPartitionsSingleMessage(exchangeId(), + false, + null, + false); + + for (String cache : caches) { + Map m = new HashMap<>(); + + GridDhtPartitionMap2 partMap = new GridDhtPartitionMap2(node.id(), 1, topologyVersion(), m, true); + + GridAffinityAssignmentCache assign = cctx.affinity().cacheAssignment(CU.cacheId(cache)); + + for (Integer part : assign.primaryPartitions(node.id(), topologyVersion())) + partMap.put(part, MOVING); + for (Integer part : assign.backupPartitions(node.id(), topologyVersion())) + partMap.put(part, MOVING); + + msg.addLocalPartitionMap(CU.cacheId(cache), partMap, null); + } + + updatePartitionSingleMap(msg); + } + } + if (remaining.isEmpty()) onAllReceived(false); } - else - sendPartitions(crd); + else { + boolean skipSnd = false; + + if (cctx.exchange().skipFirstExchangeMessage() && discoEvt.type() == EVT_NODE_JOINED && discoEvt.eventNode().isLocal()) + skipSnd = true; + + if (!skipSnd) { + long sndStart = System.currentTimeMillis(); + + sendPartitions(crd); + + log.info("Send parts time [topVer=" + topologyVersion() + ", time=" + (System.currentTimeMillis() - sndStart) + ']'); + } + else + log.info("Skip first exchange message [topVer=" + topologyVersion() + ']'); + } initDone(); } @@ -1051,9 +1125,20 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter 0) { + log.info("Completed partition exchange [maxSingleUpdate=" + singleMsgUpdateMaxTime + + ", minSingleUpdate=" + singleMsgUpdateMinTime + + ", duration=" + duration() + + ", durationFromInit=" + (U.currentTimeMillis() - initTs) + ']'); + } + else { + log.info("Completed partition exchange [duration=" + duration() + + ", durationFromInit=" + (U.currentTimeMillis() - initTs) + ']'); + } + if (log.isDebugEnabled()) log.debug("Completed partition exchange [localNode=" + cctx.localNodeId() + ", exchange= " + this + - "duration=" + duration() + ", durationFromInit=" + (U.currentTimeMillis() - initTs) + ']'); + ", duration=" + duration() + ", durationFromInit=" + (U.currentTimeMillis() - initTs) + ']'); initFut.onDone(err == null); @@ -1168,6 +1253,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter singleMsgUpdateMaxTime) + singleMsgUpdateMaxTime = time; + if (time < singleMsgUpdateMinTime) + singleMsgUpdateMinTime = time; + assert pendingSingleUpdates > 0; pendingSingleUpdates--; @@ -1252,6 +1348,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter errs = null; @@ -1188,7 +1195,7 @@ class ServerImpl extends TcpDiscoveryImpl { // Connection has been established, but // join request may not be unmarshalled on remote host. // E.g. due to class not found issue. - joinReqSent = msg instanceof TcpDiscoveryJoinRequestMessage; + joinReqSent = join; int receipt = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)); @@ -3762,8 +3769,79 @@ class ServerImpl extends TcpDiscoveryImpl { if (nodeAddedMsg.verified()) msgHist.add(nodeAddedMsg); } - else if (sendMessageToRemotes(msg)) - sendMessageAcrossRing(msg); + else { + if (sendMessageToRemotes(msg)) { + if (SEND_JOIN_REQ_DIRECTLY && !msg.directSendFailed()) { + final TcpDiscoveryNode crd = resolveCoordinator(); + + Collection failedNodes; + + synchronized (mux) { + failedNodes = U.arrayList(ServerImpl.this.failedNodes.keySet()); + } + + TcpDiscoveryNode next = ring.nextNode(failedNodes); + + if (crd != null && !crd.equals(next)) { + if (log.isDebugEnabled()) { + log.debug("Will send join request directly to coordinator " + + "[msg=" + msg + ", crd=" + crd + ", next=" + next + ']'); + } + + log.info("Will send join request directly to coordinator " + + "[cnt=" + joiningNodes.size() + ", msg=" + msg + ", crd=" + crd + ", next=" + next + ']'); + + utilityPool.submit(new Runnable() { + @Override public void run() { + IgniteSpiException sndErr = null; + Integer res = null; + + TcpDiscoveryJoinRequestMessage msg0 = + new TcpDiscoveryJoinRequestMessage(msg.node(), msg.discoveryData()); + + try { + res = trySendMessageDirectly(crd, msg0); + + if (F.eq(RES_OK, res)) { + if (log.isDebugEnabled()) { + log.debug("Sent join request directly to coordinator " + + "[msg=" + msg0 + ", crd=" + crd + ']'); + } + + log.info("Sent join request directly to coordinator " + + "[msg=" + msg0 + ", crd=" + crd + ']'); + + return; + } + } + catch (IgniteSpiException e) { + sndErr = e; + } + + if (log.isDebugEnabled()) { + log.debug("Failed to send join request to coordinator, will process from " + + "message worker [msg=" + msg0 + ", crd=" + crd + ", err=" + sndErr + + ", res=" + res + ']'); + } + + log.info("Failed to send join request to coordinator, will process from " + + "message worker [msg=" + msg0 + ", crd=" + crd + ", err=" + sndErr + + ", res=" + res + ']'); + + msg.directSendFailed(true); + + msgWorker.addMessage(msg); + } + }); + + return; + } + } + + sendMessageAcrossRing(msg); + } + + } } /** @@ -3798,7 +3876,7 @@ class ServerImpl extends TcpDiscoveryImpl { * @param msg Message. * @throws IgniteSpiException Last failure if all attempts failed. */ - private void trySendMessageDirectly(TcpDiscoveryNode node, TcpDiscoveryAbstractMessage msg) + private Integer trySendMessageDirectly(TcpDiscoveryNode node, TcpDiscoveryAbstractMessage msg) throws IgniteSpiException { if (node.isClient()) { TcpDiscoveryNode routerNode = ring.node(node.clientRouterNodeId()); @@ -3819,25 +3897,21 @@ class ServerImpl extends TcpDiscoveryImpl { worker.addMessage(msg); - return; + return null; } - trySendMessageDirectly(routerNode, msg); - - return; + return trySendMessageDirectly(routerNode, msg); } IgniteSpiException ex = null; for (InetSocketAddress addr : spi.getNodeAddresses(node, U.sameMacs(locNode, node))) { try { - sendMessageDirectly(msg, addr); + Integer res = sendMessageDirectly(msg, addr, false); node.lastSuccessfulAddress(addr); - ex = null; - - break; + return res; } catch (IgniteSpiException e) { ex = e; @@ -3846,6 +3920,8 @@ class ServerImpl extends TcpDiscoveryImpl { if (ex != null) throw ex; + + return null; } /** @@ -3968,7 +4044,10 @@ class ServerImpl extends TcpDiscoveryImpl { if (isLocalNodeCoordinator()) { if (msg.verified()) { - spi.stats.onRingMessageReceived(msg); + Long time = spi.stats.onRingMessageReceived(msg); + + if (time != null) + log.info("TcpDiscoveryNodeAddedMessage ring time: " + time); TcpDiscoveryNodeAddFinishedMessage addFinishMsg = new TcpDiscoveryNodeAddFinishedMessage(locNodeId, node.id()); @@ -4277,7 +4356,10 @@ class ServerImpl extends TcpDiscoveryImpl { if (locNodeCoord) { if (msg.verified()) { - spi.stats.onRingMessageReceived(msg); + Long time = spi.stats.onRingMessageReceived(msg); + + if (time != null) + log.info("TcpDiscoveryNodeAddFinishedMessage ring time: " + time); addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false)); @@ -5164,7 +5246,10 @@ class ServerImpl extends TcpDiscoveryImpl { else { addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id(), true)); - spi.stats.onRingMessageReceived(msg); + Long time = spi.stats.onRingMessageReceived(msg); + + if (time != null) + log.info("TcpDiscoveryCustomEventMessage ring time: " + time); DiscoverySpiCustomMessage msgObj = null; @@ -5561,6 +5646,9 @@ class ServerImpl extends TcpDiscoveryImpl { ClientMessageWorker clientMsgWrk = null; + TcpDiscoveryAbstractMessage msg = null; + Exception sockE = null; + try { InputStream in; @@ -5621,7 +5709,7 @@ class ServerImpl extends TcpDiscoveryImpl { // Restore timeout. sock.setSoTimeout(timeout); - TcpDiscoveryAbstractMessage msg = spi.readMessage(sock, in, spi.netTimeout); + msg = spi.readMessage(sock, in, spi.netTimeout); // Ping. if (msg instanceof TcpDiscoveryPingRequest) { @@ -5726,6 +5814,8 @@ class ServerImpl extends TcpDiscoveryImpl { } } catch (IOException e) { + sockE = e; + if (log.isDebugEnabled()) U.error(log, "Caught exception on handshake [err=" + e +", sock=" + sock + ']', e); @@ -5753,6 +5843,8 @@ class ServerImpl extends TcpDiscoveryImpl { return; } catch (IgniteCheckedException e) { + sockE = e; + if (log.isDebugEnabled()) U.error(log, "Caught exception on handshake [err=" + e +", sock=" + sock + ']', e); @@ -5782,8 +5874,7 @@ class ServerImpl extends TcpDiscoveryImpl { while (!isInterrupted()) { try { - TcpDiscoveryAbstractMessage msg = U.unmarshal(spi.marshaller(), in, - U.resolveClassLoader(spi.ignite().configuration())); + msg = U.unmarshal(spi.marshaller(), in, U.resolveClassLoader(spi.ignite().configuration())); msg.senderNodeId(nodeId); @@ -5810,9 +5901,12 @@ class ServerImpl extends TcpDiscoveryImpl { if (clientMsgWrk != null && ok) continue; - else + else { + log.info("Processed join request, close connection [msg=" + msg + ']'); + // Direct join request - no need to handle this socket anymore. break; + } } } else if (msg instanceof TcpDiscoveryClientReconnectMessage) { @@ -5985,6 +6079,8 @@ class ServerImpl extends TcpDiscoveryImpl { processClientHeartbeatMessage(heartbeatMsg); } catch (IgniteCheckedException e) { + sockE = e; + if (log.isDebugEnabled()) U.error(log, "Caught exception on message read [sock=" + sock + ", locNodeId=" + locNodeId + ", rmtNodeId=" + nodeId + ']', e); @@ -6012,6 +6108,8 @@ class ServerImpl extends TcpDiscoveryImpl { return; } catch (IOException e) { + sockE = e; + if (log.isDebugEnabled()) U.error(log, "Caught exception on message read [sock=" + sock + ", locNodeId=" + locNodeId + ", rmtNodeId=" + nodeId + ']', e); @@ -6035,6 +6133,8 @@ class ServerImpl extends TcpDiscoveryImpl { } } finally { + log.info("Close sock [readers=" + spi.stats.socketReaders() + ", msg=" + msg + ", err=" + sockE + ']'); + if (clientMsgWrk != null) { if (log.isDebugEnabled()) log.debug("Client connection failed [sock=" + sock + ", locNodeId=" + locNodeId + http://git-wip-us.apache.org/repos/asf/ignite/blob/39532a3e/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java index bb5ab66..e725d00 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage; @@ -320,6 +321,7 @@ public class TcpDiscoveryStatistics { if (crdSinceTs.get() > 0 && (msg instanceof TcpDiscoveryCustomEventMessage) || (msg instanceof TcpDiscoveryNodeAddedMessage) || + (msg instanceof TcpDiscoveryNodeAddFinishedMessage) || (msg instanceof TcpDiscoveryNodeLeftMessage) || (msg instanceof TcpDiscoveryNodeFailedMessage)) { ringMsgsSndTs.put(msg.id(), U.currentTimeMillis()); @@ -383,7 +385,7 @@ public class TcpDiscoveryStatistics { * * @param msg Message. */ - public synchronized void onRingMessageReceived(TcpDiscoveryAbstractMessage msg) { + public synchronized Long onRingMessageReceived(TcpDiscoveryAbstractMessage msg) { assert msg != null; Long sentTs = ringMsgsSndTs.get(msg.id()); @@ -399,7 +401,11 @@ public class TcpDiscoveryStatistics { if (ringMsgsSent != 0) avgRingMsgTime = (avgRingMsgTime * (ringMsgsSent - 1) + duration) / ringMsgsSent; + + return duration; } + + return null; } /** @@ -630,6 +636,10 @@ public class TcpDiscoveryStatistics { return sockReadersCreated; } + public synchronized int socketReaders() { + return sockReadersCreated - sockReadersRmv; + } + /** * Gets socket readers removed count. * http://git-wip-us.apache.org/repos/asf/ignite/blob/39532a3e/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java index 22ffae8..4422919 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java @@ -18,6 +18,7 @@ package org.apache.ignite.spi.discovery.tcp.messages; import java.util.Map; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; @@ -36,6 +37,10 @@ public class TcpDiscoveryJoinRequestMessage extends TcpDiscoveryAbstractMessage /** Discovery data. */ private final Map discoData; + /** */ + @GridToStringExclude + private transient boolean directSndFailed; + /** * Constructor. * @@ -79,6 +84,14 @@ public class TcpDiscoveryJoinRequestMessage extends TcpDiscoveryAbstractMessage setFlag(RESPONDED_FLAG_POS, responded); } + public boolean directSendFailed() { + return directSndFailed; + } + + public void directSendFailed(boolean directSndFailed) { + this.directSndFailed = directSndFailed; + } + /** {@inheritDoc} */ @Override public boolean equals(Object obj) { // NOTE!