Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3C8FE18BB7 for ; Fri, 18 Sep 2015 18:30:06 +0000 (UTC) Received: (qmail 88387 invoked by uid 500); 18 Sep 2015 18:30:06 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 88356 invoked by uid 500); 18 Sep 2015 18:30:06 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 88347 invoked by uid 99); 18 Sep 2015 18:30:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Sep 2015 18:30:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E540EE049D; Fri, 18 Sep 2015 18:30:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yzhdanov@apache.org To: commits@ignite.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: debugging ignite-1171 Date: Fri, 18 Sep 2015 18:30:05 +0000 (UTC) Repository: ignite Updated Branches: refs/heads/ignite-1171-debug [created] 10ee1a556 debugging ignite-1171 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/10ee1a55 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/10ee1a55 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/10ee1a55 Branch: refs/heads/ignite-1171-debug Commit: 10ee1a5563f106c7f00f7e5a999746da7b944d46 Parents: d3dd2cc Author: Yakov Zhdanov Authored: Fri Sep 18 21:29:52 2015 +0300 Committer: Yakov Zhdanov Committed: Fri Sep 18 21:29:52 2015 +0300 ---------------------------------------------------------------------- .../continuous/CacheContinuousQueryManager.java | 22 +++- .../communication/tcp/TcpCommunicationSpi.java | 5 +- .../discovery/DiscoverySpiCustomMessage.java | 12 ++- .../ignite/spi/discovery/tcp/ServerImpl.java | 104 +++++++++++++------ .../spi/discovery/tcp/TcpDiscoveryImpl.java | 6 +- .../tcp/TcpDiscoveryMultiThreadedTest.java | 2 +- 6 files changed, 104 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/10ee1a55/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index da02b97..eaa66af 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -47,6 +47,7 @@ import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.cluster.ClusterTopologyException; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; @@ -448,8 +449,23 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { taskNameHash, skipPrimaryCheck); - UUID id = cctx.kernalContext().continuous().startRoutine(hnd, bufSize, timeInterval, - autoUnsubscribe, grp.predicate()).get(); + IgniteInternalFuture f = cctx.kernalContext().continuous().startRoutine( + hnd, + bufSize, + timeInterval, + autoUnsubscribe, + grp.predicate()); + + while (!f.isDone()) { + try { + f.get(2000); + } + catch (Exception e) { + U.debug(log, "### Failed to wait for future: " + cctx.gridName() + " " + cctx.nodeId() + " " + f); + } + } + + UUID id = f.get(); if (notifyExisting) { final Iterator it = cctx.cache().allEntries().iterator(); @@ -811,4 +827,4 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { } } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/10ee1a55/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 144a0fd..c93d5af 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -2126,8 +2126,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (failureDetectionTimeoutEnabled() && (e instanceof HandshakeTimeoutException || timeoutHelper.checkFailureTimeoutReached(e))) { - log.debug("Handshake timed out (failure threshold reached) [failureDetectionTimeout=" + - failureDetectionTimeout() + ", err=" + e.getMessage() + ", client=" + client + ']'); + if (log.isDebugEnabled()) + log.debug("Handshake timed out (failure threshold reached) [failureDetectionTimeout=" + + failureDetectionTimeout() + ", err=" + e.getMessage() + ", client=" + client + ']'); throw e; } http://git-wip-us.apache.org/repos/asf/ignite/blob/10ee1a55/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java index 373c121..a0f9b75 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,13 +18,15 @@ package org.apache.ignite.spi.discovery; import java.io.Serializable; + +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.jetbrains.annotations.Nullable; /** * Message to send across ring. * - * @see org.apache.ignite.internal.managers.discovery.GridDiscoveryManager#sendCustomEvent( - * org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage) + * @see GridDiscoveryManager#sendCustomEvent(DiscoveryCustomMessage) */ public interface DiscoverySpiCustomMessage extends Serializable { /** @@ -36,4 +38,4 @@ public interface DiscoverySpiCustomMessage extends Serializable { * @return {@code true} if message can be modified during listener notification. Changes will be send to next nodes. */ public boolean isMutable(); -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/10ee1a55/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 92f90a1..9d0b3c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -64,6 +64,7 @@ import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.events.DiscoveryCustomEvent; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.processors.security.SecurityContext; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.IgniteUtils; @@ -715,7 +716,14 @@ class ServerImpl extends TcpDiscoveryImpl { /** {@inheritDoc} */ @Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) { try { - msgWorker.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, spi.marsh.marshal(evt))); + TcpDiscoveryCustomEventMessage msg = new TcpDiscoveryCustomEventMessage( + getLocalNodeId(), + evt, + spi.marsh.marshal(evt)); + + U.debug(log, "Sending custom event: " + msg); + + msgWorker.addMessage(msg); } catch (IgniteCheckedException e) { throw new IgniteSpiException("Failed to marshal custom event: " + evt, e); @@ -1857,6 +1865,9 @@ class ServerImpl extends TcpDiscoveryImpl { while (msgs.size() > MAX) { TcpDiscoveryAbstractMessage polled = msgs.poll(); + if (polled instanceof DiscoveryCustomMessage) + U.debug("### Discarded custom message ###: " + msg); + assert polled != null; if (polled.id().equals(discardId)) @@ -1865,30 +1876,6 @@ class ServerImpl extends TcpDiscoveryImpl { } /** - * Resets pending messages. - * - * @param msgs Message. - * @param discardId Discarded message ID. - */ - void reset(@Nullable Collection msgs, @Nullable IgniteUuid discardId) { - this.msgs.clear(); - - if (msgs != null) - this.msgs.addAll(msgs); - - this.discardId = discardId; - } - - /** - * Clears pending messages. - */ - void clear() { - msgs.clear(); - - discardId = null; - } - - /** * Discards message with provided ID and all before it. * * @param id Discarded message ID. @@ -1943,7 +1930,7 @@ class ServerImpl extends TcpDiscoveryImpl { private long connCheckThreshold; /** Pending custom messages that should not be sent between NodeAdded and NodeAddFinished messages. */ - private Queue pendingCustomMsgs = new LinkedList<>(); + private Queue pendingCustomMsgs = new ArrayDeque<>(); /** Collection to track joining nodes. */ private Set joiningNodes = new HashSet<>(); @@ -2053,6 +2040,8 @@ class ServerImpl extends TcpDiscoveryImpl { sendHeartbeatMessage(); checkHeartbeatsReceiving(); + + checkPendingCustomMessages(); } /** @@ -2326,6 +2315,10 @@ class ServerImpl extends TcpDiscoveryImpl { log.debug("Pending messages will be sent [failure=" + failure + ", forceSndPending=" + forceSndPending + ']'); + U.debug( + "### Pending messages will be sent [failure=" + failure + + ", forceSndPending=" + forceSndPending + ']'); + if (debugMode) debugLog("Pending messages will be sent [failure=" + failure + ", forceSndPending=" + forceSndPending + ']'); @@ -2337,9 +2330,14 @@ class ServerImpl extends TcpDiscoveryImpl { if (pendingMsg.id().equals(pendingMsgs.discardId)) skip = false; - continue; + if (!(msg instanceof DiscoveryCustomMessage)) + continue; + else + U.debug(log, "Avoid skipping custom message: " + pendingMsg); } + U.debug(log, "Sending pending: " + pendingMsg); + long tstamp = U.currentTimeMillis(); prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs, @@ -2361,13 +2359,18 @@ class ServerImpl extends TcpDiscoveryImpl { int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)); if (log.isDebugEnabled()) - log.debug("Pending message has been sent to next node [msg=" + msg.id() + - ", pendingMsgId=" + pendingMsg + ", next=" + next.id() + + log.debug("Pending message has been sent to next node [msgId=" + msg.id() + + ", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() + + ", res=" + res + ']'); + + if (msg instanceof TcpDiscoveryCustomEventMessage) + U.debug(log, "Pending message has been sent to next node [msgId=" + msg.id() + + ", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() + ", res=" + res + ']'); if (debugMode) - debugLog("Pending message has been sent to next node [msg=" + msg.id() + - ", pendingMsgId=" + pendingMsg + ", next=" + next.id() + + debugLog("Pending message has been sent to next node [msgId=" + msg.id() + + ", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() + ", res=" + res + ']'); // Resetting timeout control object to create a new one for the next bunch of @@ -2405,6 +2408,11 @@ class ServerImpl extends TcpDiscoveryImpl { ", next=" + next.id() + ", res=" + res + ']'); + if (msg instanceof TcpDiscoveryCustomEventMessage) + U.debug(log, "Message has been sent to next node [msg=" + msg + + ", next=" + next.id() + + ", res=" + res + ']'); + if (debugMode) debugLog("Message has been sent to next node [msg=" + msg + ", next=" + next.id() + @@ -3132,6 +3140,8 @@ class ServerImpl extends TcpDiscoveryImpl { joiningNodes.add(node.id()); + U.debug(log, "Added to joining: " + node.id()); + if (!isLocalNodeCoordinator() && spi.nodeAuth != null && spi.nodeAuth.isGlobalNodeAuthentication()) { boolean authFailed = true; @@ -3236,6 +3246,8 @@ class ServerImpl extends TcpDiscoveryImpl { n.visible(true); } + joiningNodes.clear(); + locNode.setAttributes(node.attributes()); locNode.visible(true); @@ -3573,6 +3585,8 @@ class ServerImpl extends TcpDiscoveryImpl { joiningNodes.remove(leftNode.id()); + U.debug(log, "removed from joining 3568: " + leftNode.id()); + spi.stats.onNodeLeft(); notifyDiscovery(EVT_NODE_LEFT, topVer, leftNode); @@ -3731,6 +3745,8 @@ class ServerImpl extends TcpDiscoveryImpl { joiningNodes.remove(node.id()); + U.debug(log, "removed from joining 3728: " + node.id()); + notifyDiscovery(EVT_NODE_FAILED, topVer, node); spi.stats.onNodeFailed(); @@ -4127,6 +4143,8 @@ class ServerImpl extends TcpDiscoveryImpl { if (!joiningNodes.isEmpty()) { pendingCustomMsgs.add(msg); + U.debug(log, "Added to pending: " + msg); + return; } @@ -4138,6 +4156,8 @@ class ServerImpl extends TcpDiscoveryImpl { notifyDiscoveryListener(msg); + U.debug(log, "Verified: " + msg); + sndNext = true; } else @@ -4171,22 +4191,40 @@ class ServerImpl extends TcpDiscoveryImpl { } } - addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id())); + U.debug(log, "Discarding custom message: " + msg); + + //addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id())); } } else { - if (msg.verified()) + if (msg.verified()) { + assert joiningNodes.isEmpty(); + + U.debug(log, "Processing custom message: " + msg); + notifyDiscoveryListener(msg); + } if (ring.hasRemoteNodes()) sendMessageAcrossRing(msg); } } + long lastCheck = U.currentTimeMillis(); + /** * Checks and flushes custom event messages if no nodes are attempting to join the grid. */ private void checkPendingCustomMessages() { + if (lastCheck + 2000 < U.currentTimeMillis()) { + U.debug( + log, + "Custom messages [msgs=" + pendingCustomMsgs.size() + ", locNodeId=" + locNode.id() + + ", locNodeOrder=" + locNode.order() + ", joining=" + joiningNodes + ']'); + + lastCheck = U.currentTimeMillis(); + } + if (joiningNodes.isEmpty() && isLocalNodeCoordinator()) { TcpDiscoveryCustomEventMessage msg; http://git-wip-us.apache.org/repos/asf/ignite/blob/10ee1a55/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java index e5be530..2786d0b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -313,4 +313,4 @@ abstract class TcpDiscoveryImpl { return res; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/10ee1a55/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java index 4f3c9a9..1ccbe1f 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java @@ -293,4 +293,4 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { stopAllGrids(); } } -} \ No newline at end of file +}