Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 32F5018822 for ; Tue, 22 Sep 2015 02:21:52 +0000 (UTC) Received: (qmail 79517 invoked by uid 500); 22 Sep 2015 02:21:52 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 79434 invoked by uid 500); 22 Sep 2015 02:21:51 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 79232 invoked by uid 99); 22 Sep 2015 02:21:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 Sep 2015 02:21:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 82EE5E052E; Tue, 22 Sep 2015 02:21:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: agoncharuk@apache.org To: commits@ignite.apache.org Date: Tue, 22 Sep 2015 02:21:59 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [9/9] ignite git commit: 1171-debug 1171-debug Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/af6deb8b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/af6deb8b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/af6deb8b Branch: refs/heads/ignite-1171-debug Commit: af6deb8bb17cc447c1c7e1fd28ef955bcf8ef76c Parents: 72baa62 Author: Alexey Goncharuk Authored: Mon Sep 21 19:21:36 2015 -0700 Committer: Alexey Goncharuk Committed: Mon Sep 21 19:21:36 2015 -0700 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ServerImpl.java | 168 +++++++++++++------ .../messages/TcpDiscoveryDiscardMessage.java | 15 +- .../messages/TcpDiscoveryNodeAddedMessage.java | 33 +++- 3 files changed, 163 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/af6deb8b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index f625d0d..69dd512 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -37,9 +37,11 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import java.util.Queue; import java.util.Set; import java.util.SortedMap; @@ -1370,8 +1372,14 @@ class ServerImpl extends TcpDiscoveryImpl { * @param msgs Messages to include. * @param discardMsgId Discarded message ID. */ - private void prepareNodeAddedMessage(TcpDiscoveryAbstractMessage msg, UUID destNodeId, - @Nullable Collection msgs, @Nullable IgniteUuid discardMsgId) { + private void prepareNodeAddedMessage( + TcpDiscoveryAbstractMessage msg, + UUID destNodeId, + @Nullable Collection msgs, + @Nullable IgniteUuid discardMsgId, + @Nullable Collection customMsgs, + @Nullable IgniteUuid discardCustomMsgId + ) { assert destNodeId != null; if (msg instanceof TcpDiscoveryNodeAddedMessage) { @@ -1395,7 +1403,7 @@ class ServerImpl extends TcpDiscoveryImpl { } nodeAddedMsg.topology(topToSnd); - nodeAddedMsg.messages(msgs, discardMsgId); + nodeAddedMsg.messages(msgs, discardMsgId, customMsgs, discardCustomMsgId); Map> hist; @@ -1418,7 +1426,7 @@ class ServerImpl extends TcpDiscoveryImpl { nodeAddedMsg.topology(null); nodeAddedMsg.topologyHistory(null); - nodeAddedMsg.messages(null, null); + nodeAddedMsg.messages(null, null, null, null); } } @@ -1827,7 +1835,7 @@ class ServerImpl extends TcpDiscoveryImpl { */ private TcpDiscoveryAbstractMessage prepare(TcpDiscoveryAbstractMessage msg, UUID destNodeId) { if (msg instanceof TcpDiscoveryNodeAddedMessage) - prepareNodeAddedMessage(msg, destNodeId, null, null); + prepareNodeAddedMessage(msg, destNodeId, null, null, null, null); return msg; } @@ -1836,19 +1844,25 @@ class ServerImpl extends TcpDiscoveryImpl { /** * Pending messages container. */ - private static class PendingMessages { + private static class PendingMessages implements Iterable { /** */ private static final int MAX = 1024; /** Pending messages. */ private final Queue msgs = new ArrayDeque<>(MAX * 2); + /** Pending messages. */ + private final Queue customMsgs = new ArrayDeque<>(MAX * 2); + /** Processed custom message IDs. */ private Set procCustomMsgs = new GridBoundedLinkedHashSet(MAX * 2); /** Discarded message ID. */ private IgniteUuid discardId; + /** Discarded message ID. */ + private IgniteUuid customDiscardId; + /** * Adds pending message and shrinks queue if it exceeds limit * (messages that were not discarded yet are never removed). @@ -1856,10 +1870,12 @@ class ServerImpl extends TcpDiscoveryImpl { * @param msg Message to add. */ void add(TcpDiscoveryAbstractMessage msg) { - msgs.add(msg); + Queue msgs0 = msg instanceof TcpDiscoveryCustomEventMessage ? customMsgs : msgs; - while (msgs.size() > MAX) { - TcpDiscoveryAbstractMessage polled = msgs.poll(); + msgs0.add(msg); + + while (msgs0.size() > MAX) { + TcpDiscoveryAbstractMessage polled = msgs0.poll(); assert polled != null; @@ -1874,11 +1890,23 @@ class ServerImpl extends TcpDiscoveryImpl { * @param msgs Message. * @param discardId Discarded message ID. */ - void reset(@Nullable Collection msgs, @Nullable IgniteUuid discardId) { + void reset(@Nullable Collection msgs, @Nullable IgniteUuid discardId, + @Nullable Collection customMsgs, @Nullable IgniteUuid duscardCustomId) { this.msgs.clear(); + this.customMsgs.clear(); - if (msgs != null) - this.msgs.addAll(msgs); + if (msgs != null) { + // Backward compatibility: old nodes send messages in one collection. + for (TcpDiscoveryAbstractMessage msg : msgs) { + if (msg instanceof TcpDiscoveryCustomEventMessage) + this.customMsgs.add(msg); + else + this.msgs.add(msg); + } + } + + if (customMsgs != null) + this.customMsgs.addAll(customMsgs); this.discardId = discardId; } @@ -1888,8 +1916,44 @@ class ServerImpl extends TcpDiscoveryImpl { * * @param id Discarded message ID. */ - void discard(IgniteUuid id) { - discardId = id; + void discard(IgniteUuid id, boolean custom) { + if (custom) + customDiscardId = id; + else + discardId = id; + } + + /** + * Gets iterator for non-discarded messages. + * + * @return Non-discarded messages iterator. + */ + public Iterator iterator() { + Iterator msgIt = msgs.iterator(); + + if (discardId != null) { + while (msgIt.hasNext()) { + TcpDiscoveryAbstractMessage msg = msgIt.next(); + + // Skip all messages before discarded, inclusive. + if (discardId.equals(msg.id())) + break; + } + } + + Iterator customMsgIt = customMsgs.iterator(); + + if (customDiscardId != null) { + while (customMsgIt.hasNext()) { + TcpDiscoveryAbstractMessage msg = customMsgIt.next(); + + // Skip all messages before discarded, inclusive. + if (customDiscardId.equals(msg.id())) + break; + } + } + + return F.concat(msgIt, customMsgIt); } } @@ -2327,21 +2391,11 @@ class ServerImpl extends TcpDiscoveryImpl { debugLog("Pending messages will be sent [failure=" + failure + ", forceSndPending=" + forceSndPending + ']'); - boolean skip = pendingMsgs.discardId != null; - - for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs) { - if (skip) { - if (pendingMsg.id().equals(pendingMsgs.discardId)) - skip = false; - - if (!(pendingMsg instanceof TcpDiscoveryCustomEventMessage)) - continue; - } - + for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) { long tstamp = U.currentTimeMillis(); prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs, - pendingMsgs.discardId); + pendingMsgs.discardId, pendingMsgs.customMsgs, pendingMsgs.customDiscardId); if (timeoutHelper == null) timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi); @@ -2382,7 +2436,8 @@ class ServerImpl extends TcpDiscoveryImpl { msg = new TcpDiscoveryStatusCheckMessage(locNode, null); } else - prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId); + prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId, + pendingMsgs.customMsgs, pendingMsgs.customDiscardId); try { long tstamp = U.currentTimeMillis(); @@ -2521,17 +2576,9 @@ class ServerImpl extends TcpDiscoveryImpl { if (debugMode) log.debug("Pending messages will be resent to local node"); - boolean skip = pendingMsgs.discardId != null; - - for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs) { - if (skip) { - if (pendingMsg.id().equals(pendingMsgs.discardId)) - skip = false; - - continue; - } - - prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs, pendingMsgs.discardId); + for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) { + prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs, pendingMsgs.discardId, + pendingMsgs.customMsgs, pendingMsgs.customDiscardId); msgWorker.addMessage(pendingMsg); @@ -3087,7 +3134,7 @@ class ServerImpl extends TcpDiscoveryImpl { processNodeAddFinishedMessage(addFinishMsg); - addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id())); + addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false)); return; } @@ -3130,6 +3177,8 @@ class ServerImpl extends TcpDiscoveryImpl { joiningNodes.add(node.id()); + U.debug(log, "Added joining node [joiningNodes=" + joiningNodes + ", node=" + node.id() + ']'); + if (!isLocalNodeCoordinator() && spi.nodeAuth != null && spi.nodeAuth.isGlobalNodeAuthentication()) { boolean authFailed = true; @@ -3251,10 +3300,11 @@ class ServerImpl extends TcpDiscoveryImpl { topHist.clear(); topHist.putAll(msg.topologyHistory()); - pendingMsgs.reset(msg.messages(), msg.discardedMessageId()); + pendingMsgs.reset(msg.messages(), msg.discardedMessageId(), + msg.customMessages(), msg.discardedCustomMessageId()); // Clear data to minimize message size. - msg.messages(null, null); + msg.messages(null, null, null, null); msg.topology(null); msg.topologyHistory(null); msg.clearDiscoveryData(); @@ -3321,7 +3371,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (msg.verified()) { spi.stats.onRingMessageReceived(msg); - addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id())); + addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false)); return; } @@ -3358,6 +3408,8 @@ class ServerImpl extends TcpDiscoveryImpl { joiningNodes.remove(nodeId); + U.debug(log, "Joining nodes remove1: " + joiningNodes + ", node=" + nodeId); + if (msg.verified() && !locNodeId.equals(nodeId) && spiStateCopy() == CONNECTED && fireEvt) { spi.stats.onNodeJoined(); @@ -3499,7 +3551,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (msg.verified()) { spi.stats.onRingMessageReceived(msg); - addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id())); + addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false)); return; } @@ -3573,6 +3625,8 @@ class ServerImpl extends TcpDiscoveryImpl { joiningNodes.remove(leftNode.id()); + U.debug(log, "Joining nodes remove2: " + joiningNodes + ", node=" + leftNode.id()); + spi.stats.onNodeLeft(); notifyDiscovery(EVT_NODE_LEFT, topVer, leftNode); @@ -3672,7 +3726,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (msg.verified()) { spi.stats.onRingMessageReceived(msg); - addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id())); + addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false)); return; } @@ -3731,6 +3785,8 @@ class ServerImpl extends TcpDiscoveryImpl { joiningNodes.remove(node.id()); + U.debug(log, "Joining nodes remove3: " + joiningNodes + ", node=" + node.id()); + notifyDiscovery(EVT_NODE_FAILED, topVer, node); spi.stats.onNodeFailed(); @@ -4072,7 +4128,7 @@ class ServerImpl extends TcpDiscoveryImpl { } if (msg.verified()) - pendingMsgs.discard(msgId); + pendingMsgs.discard(msgId, msg.customMessageDiscard()); if (ring.hasRemoteNodes()) sendMessageAcrossRing(msg); @@ -4123,6 +4179,8 @@ class ServerImpl extends TcpDiscoveryImpl { * @param msg Message. */ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) { + U.debug(log, "Processing custom message: " + msg); + if (isLocalNodeCoordinator()) { if (!joiningNodes.isEmpty()) { pendingCustomMsgs.add(msg); @@ -4133,11 +4191,15 @@ class ServerImpl extends TcpDiscoveryImpl { boolean sndNext = !msg.verified(); if (sndNext) { + U.debug(log, "Joining nodes are empty on coordinator, will proceed with message: " + msg); + msg.verify(getLocalNodeId()); msg.topologyVersion(ring.topologyVersion()); if (pendingMsgs.procCustomMsgs.add(msg.id())) notifyDiscoveryListener(msg); + else + sndNext = false; } if (sndNext && ring.hasRemoteNodes()) @@ -4167,6 +4229,8 @@ class ServerImpl extends TcpDiscoveryImpl { } } } + + addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id(), true)); } } else { @@ -4176,15 +4240,17 @@ class ServerImpl extends TcpDiscoveryImpl { state0 = spiState; } - if (msg.verified() && state0 == CONNECTED) { - assert joiningNodes.isEmpty() : "Joining nodes: " + joiningNodes; + if (msg.verified() && state0 == CONNECTED && pendingMsgs.procCustomMsgs.add(msg.id())) { + assert joiningNodes.isEmpty() : "Joining nodes: " + joiningNodes + ", msg=" + msg + ", loc=" + locNode.id(); - if (pendingMsgs.procCustomMsgs.add(msg.id())) - notifyDiscoveryListener(msg); + notifyDiscoveryListener(msg); } - if (ring.hasRemoteNodes()) + if (ring.hasRemoteNodes()) { + U.debug(log, "Will send message to the next node in topology [next=" + next + ", msg=" + msg + ']'); + sendMessageAcrossRing(msg); + } } } @@ -5130,7 +5196,7 @@ class ServerImpl extends TcpDiscoveryImpl { log.debug("Redirecting message to client [sock=" + sock + ", locNodeId=" + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']'); - prepareNodeAddedMessage(msg, clientNodeId, null, null); + prepareNodeAddedMessage(msg, clientNodeId, null, null, null, null); writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : spi.getSocketTimeout()); http://git-wip-us.apache.org/repos/asf/ignite/blob/af6deb8b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java index 1e1fa6b..145f19e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java @@ -32,16 +32,20 @@ public class TcpDiscoveryDiscardMessage extends TcpDiscoveryAbstractMessage { /** ID of the message to discard (this and all preceding). */ private final IgniteUuid msgId; + /** True if this is discard ID for custom event message. */ + private final boolean customMsgDiscard; + /** * Constructor. * * @param creatorNodeId Creator node ID. * @param msgId Message ID. */ - public TcpDiscoveryDiscardMessage(UUID creatorNodeId, IgniteUuid msgId) { + public TcpDiscoveryDiscardMessage(UUID creatorNodeId, IgniteUuid msgId, boolean customMsgDiscard) { super(creatorNodeId); this.msgId = msgId; + this.customMsgDiscard = customMsgDiscard; } /** @@ -53,6 +57,15 @@ public class TcpDiscoveryDiscardMessage extends TcpDiscoveryAbstractMessage { return msgId; } + /** + * Flag indicating whether the ID to discard is for a custom message or not. + * + * @return Custom message flag. + */ + public boolean customMessageDiscard() { + return customMsgDiscard; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpDiscoveryDiscardMessage.class, this, "super", super.toString()); http://git-wip-us.apache.org/repos/asf/ignite/blob/af6deb8b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java index 01c6789..789f2b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java @@ -48,6 +48,12 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage { /** Discarded message ID. */ private IgniteUuid discardMsgId; + /** Pending messages from previous node. */ + private Collection customMsgs; + + /** Discarded message ID. */ + private IgniteUuid discardCustomMsgId; + /** Current topology. Initialized by coordinator. */ @GridToStringInclude private Collection top; @@ -117,14 +123,39 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage { } /** + * Gets pending cusotm messages sent to new node by its previous. + * + * @return Pending messages from previous node. + */ + @Nullable public Collection customMessages() { + return customMsgs; + } + + /** + * Gets discarded custom message ID. + * + * @return Discarded message ID. + */ + @Nullable public IgniteUuid discardedCustomMessageId() { + return discardCustomMsgId; + } + + /** * Sets pending messages to send to new node. * * @param msgs Pending messages to send to new node. * @param discardMsgId Discarded message ID. */ - public void messages(@Nullable Collection msgs, @Nullable IgniteUuid discardMsgId) { + public void messages( + @Nullable Collection msgs, + @Nullable IgniteUuid discardMsgId, + @Nullable Collection customMsgs, + @Nullable IgniteUuid discardCustomMsgId + ) { this.msgs = msgs; this.discardMsgId = discardMsgId; + this.customMsgs = customMsgs; + this.discardCustomMsgId = discardCustomMsgId; } /**