ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [01/23] ignite git commit: debugging ignite-1171
Date Wed, 23 Sep 2015 01:36:30 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1171 46a22e3a7 -> f5dcaf35e


debugging ignite-1171


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/10ee1a55
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/10ee1a55
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/10ee1a55

Branch: refs/heads/ignite-1171
Commit: 10ee1a5563f106c7f00f7e5a999746da7b944d46
Parents: d3dd2cc
Author: Yakov Zhdanov <yzhdanov@gridgain.com>
Authored: Fri Sep 18 21:29:52 2015 +0300
Committer: Yakov Zhdanov <yzhdanov@gridgain.com>
Committed: Fri Sep 18 21:29:52 2015 +0300

----------------------------------------------------------------------
 .../continuous/CacheContinuousQueryManager.java |  22 +++-
 .../communication/tcp/TcpCommunicationSpi.java  |   5 +-
 .../discovery/DiscoverySpiCustomMessage.java    |  12 ++-
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 104 +++++++++++++------
 .../spi/discovery/tcp/TcpDiscoveryImpl.java     |   6 +-
 .../tcp/TcpDiscoveryMultiThreadedTest.java      |   2 +-
 6 files changed, 104 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/10ee1a55/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index da02b97..eaa66af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -47,6 +47,7 @@ import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -448,8 +449,23 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
             taskNameHash,
             skipPrimaryCheck);
 
-        UUID id = cctx.kernalContext().continuous().startRoutine(hnd, bufSize, timeInterval,
-            autoUnsubscribe, grp.predicate()).get();
+        IgniteInternalFuture<UUID> f = cctx.kernalContext().continuous().startRoutine(
+            hnd,
+            bufSize,
+            timeInterval,
+            autoUnsubscribe,
+            grp.predicate());
+
+        while (!f.isDone()) {
+            try {
+                f.get(2000);
+            }
+            catch (Exception e) {
+                U.debug(log, "### Failed to wait for future: " + cctx.gridName() + " " +
cctx.nodeId() + " " + f);
+            }
+        }
+
+        UUID id = f.get();
 
         if (notifyExisting) {
             final Iterator<GridCacheEntryEx> it = cctx.cache().allEntries().iterator();
@@ -811,4 +827,4 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
             }
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ee1a55/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 144a0fd..c93d5af 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -2126,8 +2126,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                 if (failureDetectionTimeoutEnabled() && (e instanceof HandshakeTimeoutException
||
                     timeoutHelper.checkFailureTimeoutReached(e))) {
-                    log.debug("Handshake timed out (failure threshold reached) [failureDetectionTimeout="
+
-                        failureDetectionTimeout() + ", err=" + e.getMessage() + ", client="
+ client + ']');
+                    if (log.isDebugEnabled())
+                        log.debug("Handshake timed out (failure threshold reached) [failureDetectionTimeout="
+
+                            failureDetectionTimeout() + ", err=" + e.getMessage() + ", client="
+ client + ']');
 
                     throw e;
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ee1a55/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
index 373c121..a0f9b75 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,13 +18,15 @@
 package org.apache.ignite.spi.discovery;
 
 import java.io.Serializable;
+
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Message to send across ring.
  *
- * @see org.apache.ignite.internal.managers.discovery.GridDiscoveryManager#sendCustomEvent(
- * org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage)
+ * @see GridDiscoveryManager#sendCustomEvent(DiscoveryCustomMessage)
  */
 public interface DiscoverySpiCustomMessage extends Serializable {
     /**
@@ -36,4 +38,4 @@ public interface DiscoverySpiCustomMessage extends Serializable {
      * @return {@code true} if message can be modified during listener notification. Changes
will be send to next nodes.
      */
     public boolean isMutable();
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ee1a55/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 92f90a1..9d0b3c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -64,6 +64,7 @@ import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.processors.security.SecurityContext;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -715,7 +716,14 @@ class ServerImpl extends TcpDiscoveryImpl {
     /** {@inheritDoc} */
     @Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) {
         try {
-            msgWorker.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt,
spi.marsh.marshal(evt)));
+            TcpDiscoveryCustomEventMessage msg = new TcpDiscoveryCustomEventMessage(
+                getLocalNodeId(),
+                evt,
+                spi.marsh.marshal(evt));
+
+            U.debug(log, "Sending custom event: " + msg);
+
+            msgWorker.addMessage(msg);
         }
         catch (IgniteCheckedException e) {
             throw new IgniteSpiException("Failed to marshal custom event: " + evt, e);
@@ -1857,6 +1865,9 @@ class ServerImpl extends TcpDiscoveryImpl {
             while (msgs.size() > MAX) {
                 TcpDiscoveryAbstractMessage polled = msgs.poll();
 
+                if (polled instanceof DiscoveryCustomMessage)
+                    U.debug("### Discarded custom message ###: " + msg);
+
                 assert polled != null;
 
                 if (polled.id().equals(discardId))
@@ -1865,30 +1876,6 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
-         * Resets pending messages.
-         *
-         * @param msgs Message.
-         * @param discardId Discarded message ID.
-         */
-        void reset(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable
IgniteUuid discardId) {
-            this.msgs.clear();
-
-            if (msgs != null)
-                this.msgs.addAll(msgs);
-
-            this.discardId = discardId;
-        }
-
-        /**
-         * Clears pending messages.
-         */
-        void clear() {
-            msgs.clear();
-
-            discardId = null;
-        }
-
-        /**
          * Discards message with provided ID and all before it.
          *
          * @param id Discarded message ID.
@@ -1943,7 +1930,7 @@ class ServerImpl extends TcpDiscoveryImpl {
         private long connCheckThreshold;
 
         /** Pending custom messages that should not be sent between NodeAdded and NodeAddFinished
messages. */
-        private Queue<TcpDiscoveryCustomEventMessage> pendingCustomMsgs = new LinkedList<>();
+        private Queue<TcpDiscoveryCustomEventMessage> pendingCustomMsgs = new ArrayDeque<>();
 
         /** Collection to track joining nodes. */
         private Set<UUID> joiningNodes = new HashSet<>();
@@ -2053,6 +2040,8 @@ class ServerImpl extends TcpDiscoveryImpl {
             sendHeartbeatMessage();
 
             checkHeartbeatsReceiving();
+
+            checkPendingCustomMessages();
         }
 
         /**
@@ -2326,6 +2315,10 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     log.debug("Pending messages will be sent [failure=" +
failure +
                                         ", forceSndPending=" + forceSndPending + ']');
 
+                                U.debug(
+                                    "### Pending messages will be sent [failure=" + failure
+
+                                        ", forceSndPending=" + forceSndPending + ']');
+
                                 if (debugMode)
                                     debugLog("Pending messages will be sent [failure=" +
failure +
                                         ", forceSndPending=" + forceSndPending + ']');
@@ -2337,9 +2330,14 @@ class ServerImpl extends TcpDiscoveryImpl {
                                         if (pendingMsg.id().equals(pendingMsgs.discardId))
                                             skip = false;
 
-                                        continue;
+                                        if (!(msg instanceof DiscoveryCustomMessage))
+                                            continue;
+                                        else
+                                            U.debug(log, "Avoid skipping custom message:
" + pendingMsg);
                                     }
 
+                                    U.debug(log, "Sending pending: " + pendingMsg);
+
                                     long tstamp = U.currentTimeMillis();
 
                                     prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs,
@@ -2361,13 +2359,18 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0));
 
                                     if (log.isDebugEnabled())
-                                        log.debug("Pending message has been sent to next
node [msg=" + msg.id() +
-                                            ", pendingMsgId=" + pendingMsg + ", next=" +
next.id() +
+                                        log.debug("Pending message has been sent to next
node [msgId=" + msg.id() +
+                                            ", pendingMsgId=" + pendingMsg.id() + ", next="
+ next.id() +
+                                            ", res=" + res + ']');
+
+                                    if (msg instanceof TcpDiscoveryCustomEventMessage)
+                                        U.debug(log, "Pending message has been sent to next
node [msgId=" + msg.id() +
+                                            ", pendingMsgId=" + pendingMsg.id() + ", next="
+ next.id() +
                                             ", res=" + res + ']');
 
                                     if (debugMode)
-                                        debugLog("Pending message has been sent to next node
[msg=" + msg.id() +
-                                            ", pendingMsgId=" + pendingMsg + ", next=" +
next.id() +
+                                        debugLog("Pending message has been sent to next node
[msgId=" + msg.id() +
+                                            ", pendingMsgId=" + pendingMsg.id() + ", next="
+ next.id() +
                                             ", res=" + res + ']');
 
                                     // Resetting timeout control object to create a new one
for the next bunch of
@@ -2405,6 +2408,11 @@ class ServerImpl extends TcpDiscoveryImpl {
                                         ", next=" + next.id() +
                                         ", res=" + res + ']');
 
+                                if (msg instanceof TcpDiscoveryCustomEventMessage)
+                                U.debug(log, "Message has been sent to next node [msg=" +
msg +
+                                        ", next=" + next.id() +
+                                        ", res=" + res + ']');
+
                                 if (debugMode)
                                     debugLog("Message has been sent to next node [msg=" +
msg +
                                         ", next=" + next.id() +
@@ -3132,6 +3140,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 joiningNodes.add(node.id());
 
+                U.debug(log, "Added to joining: " + node.id());
+
                 if (!isLocalNodeCoordinator() && spi.nodeAuth != null &&
spi.nodeAuth.isGlobalNodeAuthentication()) {
                     boolean authFailed = true;
 
@@ -3236,6 +3246,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 n.visible(true);
                             }
 
+                            joiningNodes.clear();
+
                             locNode.setAttributes(node.attributes());
 
                             locNode.visible(true);
@@ -3573,6 +3585,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 joiningNodes.remove(leftNode.id());
 
+                U.debug(log, "removed from joining 3568: " + leftNode.id());
+
                 spi.stats.onNodeLeft();
 
                 notifyDiscovery(EVT_NODE_LEFT, topVer, leftNode);
@@ -3731,6 +3745,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 joiningNodes.remove(node.id());
 
+                U.debug(log, "removed from joining 3728: " + node.id());
+
                 notifyDiscovery(EVT_NODE_FAILED, topVer, node);
 
                 spi.stats.onNodeFailed();
@@ -4127,6 +4143,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                 if (!joiningNodes.isEmpty()) {
                     pendingCustomMsgs.add(msg);
 
+                    U.debug(log, "Added to pending: " + msg);
+
                     return;
                 }
 
@@ -4138,6 +4156,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     notifyDiscoveryListener(msg);
 
+                    U.debug(log, "Verified: " + msg);
+
                     sndNext = true;
                 }
                 else
@@ -4171,22 +4191,40 @@ class ServerImpl extends TcpDiscoveryImpl {
                         }
                     }
 
-                    addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id()));
+                    U.debug(log, "Discarding custom message: " + msg);
+
+                    //addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id()));
                 }
             }
             else {
-                if (msg.verified())
+                if (msg.verified()) {
+                    assert joiningNodes.isEmpty();
+
+                    U.debug(log, "Processing custom message: " + msg);
+
                     notifyDiscoveryListener(msg);
+                }
 
                 if (ring.hasRemoteNodes())
                     sendMessageAcrossRing(msg);
             }
         }
 
+        long lastCheck = U.currentTimeMillis();
+
         /**
          * Checks and flushes custom event messages if no nodes are attempting to join the
grid.
          */
         private void checkPendingCustomMessages() {
+            if (lastCheck + 2000 < U.currentTimeMillis()) {
+                U.debug(
+                    log,
+                    "Custom messages [msgs=" + pendingCustomMsgs.size() + ", locNodeId="
+ locNode.id() +
+                        ", locNodeOrder=" + locNode.order() + ", joining=" + joiningNodes
+ ']');
+
+                lastCheck = U.currentTimeMillis();
+            }
+
             if (joiningNodes.isEmpty() && isLocalNodeCoordinator()) {
                 TcpDiscoveryCustomEventMessage msg;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ee1a55/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index e5be530..2786d0b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -313,4 +313,4 @@ abstract class TcpDiscoveryImpl {
 
         return res;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ee1a55/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
index 4f3c9a9..1ccbe1f 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
@@ -293,4 +293,4 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest
{
             stopAllGrids();
         }
     }
-}
\ No newline at end of file
+}


Mime
View raw message