ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sevdoki...@apache.org
Subject incubator-ignite git commit: # IGNITE-719 Client disco should support custom discovery event.
Date Fri, 10 Apr 2015 14:25:31 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-709 1239a66fc -> ec8e09951


# IGNITE-719 Client disco should support custom discovery event.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ec8e0995
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ec8e0995
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ec8e0995

Branch: refs/heads/ignite-709
Commit: ec8e099516f905050129d71c6e29f79b98df50e4
Parents: 1239a66
Author: sevdokimov <sergey.evdokimov@jetbrains.com>
Authored: Fri Apr 10 17:25:17 2015 +0300
Committer: sevdokimov <sergey.evdokimov@jetbrains.com>
Committed: Fri Apr 10 17:25:17 2015 +0300

----------------------------------------------------------------------
 .../discovery/tcp/TcpClientDiscoverySpi.java    | 65 +++++++++++++++++++-
 .../TcpDiscoveryCustomEventMessage.java         |  1 +
 2 files changed, 63 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ec8e0995/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
index 2258e27..7920810 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
@@ -27,7 +27,7 @@ import org.apache.ignite.lang.*;
 import org.apache.ignite.spi.*;
 import org.apache.ignite.spi.discovery.*;
 import org.apache.ignite.spi.discovery.tcp.internal.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.*;
 import org.apache.ignite.spi.discovery.tcp.messages.*;
 import org.jetbrains.annotations.*;
@@ -40,6 +40,7 @@ import java.util.concurrent.*;
 
 import static java.util.concurrent.TimeUnit.*;
 import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.*;
 import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHeartbeatMessage.*;
 
 /**
@@ -376,7 +377,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements
Tcp
 
     /** {@inheritDoc} */
     @Override public void sendCustomEvent(Serializable evt) {
-        throw new UnsupportedOperationException();
+        sockRdr.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt));
     }
 
     /**
@@ -858,6 +859,8 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements
Tcp
                         processNodeFailedMessage((TcpDiscoveryNodeFailedMessage)msg);
                     else if (msg instanceof TcpDiscoveryHeartbeatMessage)
                         processHeartbeatMessage((TcpDiscoveryHeartbeatMessage)msg);
+                    else if (msg instanceof TcpDiscoveryCustomEventMessage)
+                        processCustomMessage((TcpDiscoveryCustomEventMessage)msg);
 
                     if (ensured(msg))
                         lastMsgId = msg.id();
@@ -1153,6 +1156,51 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements
Tcp
         }
 
         /**
+         * @param msg Message.
+         */
+        private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
+            if (msg.verified()) {
+                DiscoverySpiListener lsnr = TcpClientDiscoverySpi.this.lsnr;
+
+                if (lsnr != null) {
+                    TcpDiscoveryNode node = nodeId.equals(getLocalNodeId()) ? locNode : rmtNodes.get(nodeId);
+
+                    if (node != null && node.visible())
+                        notifyDiscovery(EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allNodes(),
msg.message());
+                    else if (log.isDebugEnabled())
+                        log.debug("Received metrics from unknown node: " + nodeId);
+                }
+            }
+            else {
+                if (getLocalNodeId().equals(msg.creatorNodeId())) {
+                    Socket sock0 = sock;
+
+                    if (sock0 != null) {
+                        try {
+                            writeToSocket(sock0, msg);
+
+                            if (log.isDebugEnabled())
+                                log.debug("Heartbeat message sent [sock=" + sock0 + ", msg="
+ msg + ']');
+                        }
+                        catch (IOException | IgniteCheckedException e) {
+                            if (log.isDebugEnabled())
+                                U.error(log, "Failed to send custom message [sock=" + sock0
+
+                                    ", msg=" + msg + ']', e);
+
+                            U.closeQuiet(sock0);
+
+                            sock = null;
+
+                            interrupt();
+                        }
+                    }
+                    else if (log.isDebugEnabled())
+                        log.debug("Failed to send custom message (node is disconnected):
" + msg);
+                }
+            }
+        }
+
+        /**
          * @param nodeId Node ID.
          * @param metrics Metrics.
          * @param tstamp Timestamp.
@@ -1222,6 +1270,17 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements
Tcp
          * @param top Topology snapshot.
          */
         private void notifyDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode>
top) {
+            notifyDiscovery(type, topVer, node, top, null);
+        }
+
+        /**
+         * @param type Event type.
+         * @param topVer Topology version.
+         * @param node Node.
+         * @param top Topology snapshot.
+         */
+        private void notifyDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode>
top,
+            @Nullable Serializable data) {
             DiscoverySpiListener lsnr = TcpClientDiscoverySpi.this.lsnr;
 
             if (lsnr != null) {
@@ -1229,7 +1288,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements
Tcp
                     log.debug("Discovery notification [node=" + node + ", type=" + U.gridEventName(type)
+
                         ", topVer=" + topVer + ']');
 
-                lsnr.onDiscovery(type, topVer, node, top, new TreeMap<>(topHist), null);
+                lsnr.onDiscovery(type, topVer, node, top, new TreeMap<>(topHist), data);
             }
             else if (log.isDebugEnabled())
                 log.debug("Skipped discovery notification [node=" + node + ", type=" + U.gridEventName(type)
+

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ec8e0995/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
index 234efaa..820adff 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
@@ -23,6 +23,7 @@ import java.util.*;
 /**
  * Wrapped for custom message.
  */
+@TcpDiscoveryRedirectToClient
 @TcpDiscoveryEnsureDelivery
 public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractMessage {
     /** */


Mime
View raw message