ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [10/38] ignite git commit: IGNITE-3060 Discovery: optimize resource usage for client connections
Date Wed, 18 May 2016 10:58:16 GMT
IGNITE-3060 Discovery: optimize resource usage for client connections


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/83242336
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/83242336
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/83242336

Branch: refs/heads/ignite-3165
Commit: 83242336f5a3c0f067cfac9ed138af59970b5a92
Parents: 1f8b394
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Apr 29 09:57:10 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Apr 29 09:57:10 2016 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 48 +++++++++++---------
 .../TcpDiscoveryClientReconnectMessage.java     | 16 +++++++
 2 files changed, 43 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/83242336/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 43f4b58..e30dd24 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -2168,7 +2168,8 @@ class ServerImpl extends TcpDiscoveryImpl {
         void addMessage(TcpDiscoveryAbstractMessage msg) {
             if ((msg instanceof TcpDiscoveryStatusCheckMessage ||
                 msg instanceof TcpDiscoveryJoinRequestMessage ||
-                msg instanceof TcpDiscoveryCustomEventMessage) &&
+                msg instanceof TcpDiscoveryCustomEventMessage ||
+                msg instanceof TcpDiscoveryClientReconnectMessage) &&
                 queue.contains(msg)) {
                 if (log.isDebugEnabled())
                     log.debug("Ignoring duplicate message: " + msg);
@@ -2290,9 +2291,6 @@ class ServerImpl extends TcpDiscoveryImpl {
             else if (msg instanceof TcpDiscoveryNodeFailedMessage)
                 processNodeFailedMessage((TcpDiscoveryNodeFailedMessage)msg);
 
-            else if (msg instanceof TcpDiscoveryClientHeartbeatMessage)
-                processClientHeartbeatMessage((TcpDiscoveryClientHeartbeatMessage)msg);
-
             else if (msg instanceof TcpDiscoveryHeartbeatMessage)
                 processHeartbeatMessage((TcpDiscoveryHeartbeatMessage)msg);
 
@@ -4521,22 +4519,6 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
-         * Processes client heartbeat message.
-         *
-         * @param msg Heartbeat message.
-         */
-        private void processClientHeartbeatMessage(TcpDiscoveryClientHeartbeatMessage msg)
{
-            assert msg.client();
-
-            ClientMessageWorker wrk = clientMsgWorkers.get(msg.creatorNodeId());
-
-            if (wrk != null)
-                wrk.metrics(msg.metrics());
-            else if (log.isDebugEnabled())
-                log.debug("Received heartbeat message from unknown client node: " + msg);
-        }
-
-        /**
          * @param nodeId Node ID.
          * @param metrics Metrics.
          * @param cacheMetrics Cache metrics.
@@ -5436,7 +5418,12 @@ class ServerImpl extends TcpDiscoveryImpl {
                             continue;
                         }
 
-                        msgWorker.addMessage(msg);
+                        TcpDiscoveryClientHeartbeatMessage heartbeatMsg = null;
+
+                        if (msg instanceof TcpDiscoveryClientHeartbeatMessage)
+                            heartbeatMsg = (TcpDiscoveryClientHeartbeatMessage)msg;
+                        else
+                            msgWorker.addMessage(msg);
 
                         // Send receipt back.
                         if (clientMsgWrk != null) {
@@ -5448,6 +5435,9 @@ class ServerImpl extends TcpDiscoveryImpl {
                         }
                         else
                             spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
+
+                        if (heartbeatMsg != null)
+                            processClientHeartbeatMessage(heartbeatMsg);
                     }
                     catch (IgniteCheckedException e) {
                         if (log.isDebugEnabled())
@@ -5515,6 +5505,22 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
+         * Processes client heartbeat message.
+         *
+         * @param msg Heartbeat message.
+         */
+        private void processClientHeartbeatMessage(TcpDiscoveryClientHeartbeatMessage msg)
{
+            assert msg.client();
+
+            ClientMessageWorker wrk = clientMsgWorkers.get(msg.creatorNodeId());
+
+            if (wrk != null)
+                wrk.metrics(msg.metrics());
+            else if (log.isDebugEnabled())
+                log.debug("Received heartbeat message from unknown client node: " + msg);
+        }
+
+        /**
          * @param msg Join request message.
          * @param clientMsgWrk Client message worker to start.
          * @return Whether connection was successful.

http://git-wip-us.apache.org/repos/asf/ignite/blob/83242336/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java
index 7c0cd5d..7cce78b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java
@@ -20,6 +20,7 @@ package org.apache.ignite.spi.discovery.tcp.messages;
 import java.util.Collection;
 import java.util.UUID;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
 
@@ -96,6 +97,21 @@ public class TcpDiscoveryClientReconnectMessage extends TcpDiscoveryAbstractMess
     }
 
     /** {@inheritDoc} */
+    @Override public boolean equals(Object obj) {
+        // NOTE!
+        // Do not call super. As IDs will differ, but we can ignore this.
+
+        if (!(obj instanceof TcpDiscoveryClientReconnectMessage))
+            return false;
+
+        TcpDiscoveryClientReconnectMessage other = (TcpDiscoveryClientReconnectMessage)obj;
+
+        return F.eq(creatorNodeId(), other.creatorNodeId()) &&
+            F.eq(routerNodeId, other.routerNodeId) &&
+            F.eq(lastMsgId, other.lastMsgId);
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(TcpDiscoveryClientReconnectMessage.class, this, "super", super.toString());
     }


Mime
View raw message