ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject ignite git commit: 1171-debug - Fix WIP.
Date Tue, 22 Sep 2015 22:23:22 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1171-debug 271b7501c -> 47f9605fa


1171-debug - Fix WIP.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/47f9605f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/47f9605f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/47f9605f

Branch: refs/heads/ignite-1171-debug
Commit: 47f9605faee1972750efed4c4db7a9b36bb8f3b5
Parents: 271b750
Author: Alexey Goncharuk <alexey.goncharuk@gmail.com>
Authored: Tue Sep 22 15:23:20 2015 -0700
Committer: Alexey Goncharuk <alexey.goncharuk@gmail.com>
Committed: Tue Sep 22 15:23:20 2015 -0700

----------------------------------------------------------------------
 .../ignite/internal/util/IgniteUtils.java       |   3 +
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 168 ++++++++++++-------
 .../messages/TcpDiscoveryNodeAddedMessage.java  |  14 --
 3 files changed, 113 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/47f9605f/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index e5090cb..e730edc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -1022,6 +1022,9 @@ public abstract class IgniteUtils {
      */
     @Deprecated
     public static void debug(IgniteLogger log, String msg) {
+        if (true)
+            return;
+
         log.info(msg);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/47f9605f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 3d624d8..06a6bb0 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1377,7 +1377,6 @@ class ServerImpl extends TcpDiscoveryImpl {
         UUID destNodeId,
         @Nullable Collection<TcpDiscoveryAbstractMessage> msgs,
         @Nullable IgniteUuid discardMsgId,
-        @Nullable Collection<TcpDiscoveryAbstractMessage> customMsgs,
         @Nullable IgniteUuid discardCustomMsgId
         ) {
         assert destNodeId != null;
@@ -1403,7 +1402,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
 
                 nodeAddedMsg.topology(topToSnd);
-                nodeAddedMsg.messages(msgs, discardMsgId, customMsgs, discardCustomMsgId);
+                nodeAddedMsg.messages(msgs, discardMsgId, discardCustomMsgId);
 
                 Map<Long, Collection<ClusterNode>> hist;
 
@@ -1426,7 +1425,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             nodeAddedMsg.topology(null);
             nodeAddedMsg.topologyHistory(null);
-            nodeAddedMsg.messages(null, null, null, null);
+            nodeAddedMsg.messages(null, null, null);
         }
     }
 
@@ -1835,7 +1834,7 @@ class ServerImpl extends TcpDiscoveryImpl {
          */
         private TcpDiscoveryAbstractMessage prepare(TcpDiscoveryAbstractMessage msg, UUID
destNodeId) {
             if (msg instanceof TcpDiscoveryNodeAddedMessage)
-                prepareNodeAddedMessage(msg, destNodeId, null, null, null, null);
+                prepareNodeAddedMessage(msg, destNodeId, null, null, null);
 
             return msg;
         }
@@ -1851,9 +1850,6 @@ class ServerImpl extends TcpDiscoveryImpl {
         /** Pending messages. */
         private final Queue<TcpDiscoveryAbstractMessage> msgs = new ArrayDeque<>(MAX
* 2);
 
-        /** Pending messages. */
-        private final Queue<TcpDiscoveryAbstractMessage> customMsgs = new ArrayDeque<>(MAX
* 2);
-
         /** Processed custom message IDs. */
         private Set<IgniteUuid> procCustomMsgs = new GridBoundedLinkedHashSet<IgniteUuid>(MAX
* 2);
 
@@ -1870,12 +1866,10 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param msg Message to add.
          */
         void add(TcpDiscoveryAbstractMessage msg) {
-            Queue<TcpDiscoveryAbstractMessage> msgs0 = msg instanceof TcpDiscoveryCustomEventMessage
? customMsgs : msgs;
+            msgs.add(msg);
 
-            msgs0.add(msg);
-
-            while (msgs0.size() > MAX) {
-                TcpDiscoveryAbstractMessage polled = msgs0.poll();
+            while (msgs.size() > MAX) {
+                TcpDiscoveryAbstractMessage polled = msgs.poll();
 
                 assert polled != null;
 
@@ -1890,25 +1884,18 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param msgs Message.
          * @param discardId Discarded message ID.
          */
-        void reset(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable
IgniteUuid discardId,
-            @Nullable Collection<TcpDiscoveryAbstractMessage> customMsgs, @Nullable
IgniteUuid duscardCustomId) {
+        void reset(
+            @Nullable Collection<TcpDiscoveryAbstractMessage> msgs,
+            @Nullable IgniteUuid discardId,
+            @Nullable IgniteUuid customDiscardId
+        ) {
             this.msgs.clear();
-            this.customMsgs.clear();
 
-            if (msgs != null) {
-                // Backward compatibility: old nodes send messages in one collection.
-                for (TcpDiscoveryAbstractMessage msg : msgs) {
-                    if (msg instanceof TcpDiscoveryCustomEventMessage)
-                        this.customMsgs.add(msg);
-                    else
-                        this.msgs.add(msg);
-                }
-            }
-
-            if (customMsgs != null)
-                this.customMsgs.addAll(customMsgs);
+            if (msgs != null)
+                this.msgs.addAll(msgs);
 
             this.discardId = discardId;
+            this.customDiscardId = customDiscardId;
         }
 
         /**
@@ -1929,31 +1916,86 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @return Non-discarded messages iterator.
          */
         public Iterator<TcpDiscoveryAbstractMessage> iterator() {
-            Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.iterator();
+            return new SkipIterator();
+        }
 
-            if (discardId != null) {
-                while (msgIt.hasNext()) {
-                    TcpDiscoveryAbstractMessage msg = msgIt.next();
+        /**
+         *
+         */
+        private class SkipIterator implements Iterator<TcpDiscoveryAbstractMessage>
{
+            /** Skip non-custom messages flag. */
+            private boolean skipMsg = discardId != null;
 
-                    // Skip all messages before discarded, inclusive.
-                    if (discardId.equals(msg.id()))
-                        break;
-                }
+            /** Skip custom messages flag. */
+            private boolean skipCustomMsg;
+
+            /** Internal iterator. */
+            private Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.iterator();
+
+            /** Next message. */
+            private TcpDiscoveryAbstractMessage next;
+
+            {
+                advance();
             }
 
-            Iterator<TcpDiscoveryAbstractMessage> customMsgIt = customMsgs.iterator();
+            /** {@inheritDoc} */
+            @Override public boolean hasNext() {
+                return next != null;
+            }
 
-            if (customDiscardId != null) {
-                while (customMsgIt.hasNext()) {
-                    TcpDiscoveryAbstractMessage msg = customMsgIt.next();
+            /** {@inheritDoc} */
+            @Override public TcpDiscoveryAbstractMessage next() {
+                if (next == null)
+                    throw new NoSuchElementException();
 
-                    // Skip all messages before discarded, inclusive.
-                    if (customDiscardId.equals(msg.id()))
-                        break;
-                }
+                TcpDiscoveryAbstractMessage next0 = next;
+
+                advance();
+
+                return next0;
             }
 
-            return F.concat(msgIt, customMsgIt);
+            /** {@inheritDoc} */
+            @Override public void remove() {
+                throw new UnsupportedOperationException();
+            }
+
+            /**
+             * Advances iterator to the next available item.
+             */
+            private void advance() {
+                next = null;
+
+                while (msgIt.hasNext()) {
+                    TcpDiscoveryAbstractMessage msg0 = msgIt.next();
+
+                    if (msg0 instanceof TcpDiscoveryCustomEventMessage) {
+                        if (skipCustomMsg) {
+                            assert customDiscardId != null;
+
+                            if (F.eq(customDiscardId, msg0.id()))
+                                skipCustomMsg = false;
+
+                            continue;
+                        }
+                    }
+                    else {
+                        if (skipMsg) {
+                            assert discardId != null;
+
+                            if (F.eq(discardId, msg0.id()))
+                                skipMsg = false;
+
+                            continue;
+                        }
+                    }
+
+                    next = msg0;
+
+                    break;
+                }
+            }
         }
     }
 
@@ -2044,9 +2086,10 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (log.isDebugEnabled())
                 log.debug("Processing message [cls=" + msg.getClass().getSimpleName() + ",
id=" + msg.id() + ']');
 
-            U.debug(
-                log,
-                "Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id()
+ ']');
+            if (!(msg instanceof TcpDiscoveryHeartbeatMessage))
+                U.debug(
+                    log,
+                    "Processing message [locNodeId=" + locNode.id() + ", cls=" + msg.getClass().getSimpleName()
+ ", id=" + msg.id() + ']');
 
             if (debugMode)
                 debugLog("Processing message [cls=" + msg.getClass().getSimpleName() + ",
id=" + msg.id() + ']');
@@ -2399,7 +2442,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     long tstamp = U.currentTimeMillis();
 
                                     prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs,
-                                        pendingMsgs.discardId, pendingMsgs.customMsgs, pendingMsgs.customDiscardId);
+                                        pendingMsgs.discardId, pendingMsgs.customDiscardId);
 
                                     if (timeoutHelper == null)
                                         timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
@@ -2447,7 +2490,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                             }
                             else
                                 prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs,
pendingMsgs.discardId,
-                                    pendingMsgs.customMsgs, pendingMsgs.customDiscardId);
+                                    pendingMsgs.customDiscardId);
 
                             try {
                                 long tstamp = U.currentTimeMillis();
@@ -2467,6 +2510,9 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     log.debug("Message has been sent to next node [msg="
+ msg +
                                         ", next=" + next.id() +
                                         ", res=" + res + ']');
+                                U.debug(log, "Message has been sent to next node [msg=" +
msg +
+                                    ", next=" + next.id() +
+                                    ", res=" + res + ']');
 
                                 if (debugMode)
                                     debugLog("Message has been sent to next node [msg=" +
msg +
@@ -2588,7 +2634,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
                         prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs,
pendingMsgs.discardId,
-                            pendingMsgs.customMsgs, pendingMsgs.customDiscardId);
+                            pendingMsgs.customDiscardId);
 
                         msgWorker.addMessage(pendingMsg);
 
@@ -3314,10 +3360,10 @@ class ServerImpl extends TcpDiscoveryImpl {
                             topHist.putAll(msg.topologyHistory());
 
                             pendingMsgs.reset(msg.messages(), msg.discardedMessageId(),
-                                msg.customMessages(), msg.discardedCustomMessageId());
+                                msg.discardedCustomMessageId());
 
                             // Clear data to minimize message size.
-                            msg.messages(null, null, null, null);
+                            msg.messages(null, null, null);
                             msg.topology(null);
                             msg.topologyHistory(null);
                             msg.clearDiscoveryData();
@@ -3423,7 +3469,9 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             U.debug(log, "Joining nodes remove1: " + joiningNodes + ", node=" + nodeId);
 
-            if (msg.verified() && !locNodeId.equals(nodeId) && spiStateCopy()
== CONNECTED && fireEvt) {
+            TcpDiscoverySpiState state = spiStateCopy();
+
+            if (msg.verified() && !locNodeId.equals(nodeId) && state != CONNECTING
&& fireEvt) {
                 spi.stats.onNodeJoined();
 
                 // Make sure that node with greater order will never get EVT_NODE_JOINED
@@ -3438,7 +3486,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     boolean b = ring.topologyVersion(topVer);
 
                     assert b : "Topology version has not been updated: [ring=" + ring + ",
msg=" + msg +
-                        ", lastMsg=" + lastMsg + ", spiState=" + spiStateCopy() + ']';
+                        ", lastMsg=" + lastMsg + ", spiState=" + state + ']';
 
                     if (log.isDebugEnabled())
                         log.debug("Topology version has been updated: [ring=" + ring + ",
msg=" + msg + ']');
@@ -3450,7 +3498,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                     lastMsg = msg;
                 }
 
-                notifyDiscovery(EVT_NODE_JOINED, topVer, node);
+                if (state == CONNECTED)
+                    notifyDiscovery(EVT_NODE_JOINED, topVer, node);
 
                 try {
                     if (spi.ipFinder.isShared() && locNodeCoord)
@@ -3466,7 +3515,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
             }
 
-            if (msg.verified() && locNodeId.equals(nodeId) && spiStateCopy()
== CONNECTING) {
+            if (msg.verified() && locNodeId.equals(nodeId) && state == CONNECTING)
{
                 assert node != null;
 
                 assert topVer > 0 : "Invalid topology version: " + msg;
@@ -4204,7 +4253,7 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param msg Message.
          */
         private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
-            U.debug(log, "Processing custom message: " + msg);
+            U.debug(log, "Processing custom message [msg=" + msg + ", topVer=" + ring.topologyVersion()
+ ']');
 
             if (isLocalNodeCoordinator()) {
                 if (!joiningNodes.isEmpty()) {
@@ -5229,7 +5278,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                             log.debug("Redirecting message to client [sock=" + sock + ",
locNodeId="
                                 + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg="
+ msg + ']');
 
-                        prepareNodeAddedMessage(msg, clientNodeId, null, null, null, null);
+                        prepareNodeAddedMessage(msg, clientNodeId, null, null, null);
 
                         writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ?
                             spi.failureDetectionTimeout() : spi.getSocketTimeout());
@@ -5398,6 +5447,9 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             if (log.isDebugEnabled())
                 log.debug("Message has been added to queue: " + msg);
+
+            if (!(msg instanceof TcpDiscoveryHeartbeatMessage))
+                U.debug(log, "Message has been added to queue: " + msg);
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/47f9605f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
index 789f2b9..5a7146d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
@@ -48,9 +48,6 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage
{
     /** Discarded message ID. */
     private IgniteUuid discardMsgId;
 
-    /** Pending messages from previous node. */
-    private Collection<TcpDiscoveryAbstractMessage> customMsgs;
-
     /** Discarded message ID. */
     private IgniteUuid discardCustomMsgId;
 
@@ -123,15 +120,6 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage
{
     }
 
     /**
-     * Gets pending cusotm messages sent to new node by its previous.
-     *
-     * @return Pending messages from previous node.
-     */
-    @Nullable public Collection<TcpDiscoveryAbstractMessage> customMessages() {
-        return customMsgs;
-    }
-
-    /**
      * Gets discarded custom message ID.
      *
      * @return Discarded message ID.
@@ -149,12 +137,10 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage
{
     public void messages(
         @Nullable Collection<TcpDiscoveryAbstractMessage> msgs,
         @Nullable IgniteUuid discardMsgId,
-        @Nullable Collection<TcpDiscoveryAbstractMessage> customMsgs,
         @Nullable IgniteUuid discardCustomMsgId
     ) {
         this.msgs = msgs;
         this.discardMsgId = discardMsgId;
-        this.customMsgs = customMsgs;
         this.discardCustomMsgId = discardCustomMsgId;
     }
 


Mime
View raw message