ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [15/17] ignite git commit: IGNITE-3060 Discovery: optimize resource usage for client connections (cherry picked from commits 7b0edfb, 8324233)
Date Wed, 04 May 2016 09:08:30 GMT
IGNITE-3060 Discovery: optimize resource usage for client connections
(cherry picked from commits 7b0edfb, 8324233)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f0daea95
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f0daea95
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f0daea95

Branch: refs/heads/ignite-3024
Commit: f0daea95dfbb1d6aca3a722773001c5ec0e0f5d6
Parents: fc0dd9a
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Apr 29 09:57:10 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed May 4 09:12:24 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/IgniteComponentType.java    |   4 +-
 .../continuous/CacheContinuousQueryHandler.java |   9 +
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 214 ++++++++++++-------
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |   5 +-
 .../TcpDiscoveryClientHeartbeatMessage.java     |   1 +
 .../TcpDiscoveryClientReconnectMessage.java     |  16 ++
 .../tcp/TcpClientDiscoverySpiSelfTest.java      |  44 +++-
 7 files changed, 202 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f0daea95/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java
index 01872b6..76e495f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java
@@ -285,7 +285,7 @@ public enum IgniteComponentType {
                 return (T)ctor.newInstance(ctx);
             }
         }
-        catch (Exception e) {
+        catch (Throwable e) {
             throw componentException(e);
         }
     }
@@ -309,7 +309,7 @@ public enum IgniteComponentType {
      * @param err Creation error.
      * @return Component creation exception.
      */
-    private IgniteCheckedException componentException(Exception err) {
+    private IgniteCheckedException componentException(Throwable err) {
         return new IgniteCheckedException("Failed to create Ignite component (consider adding
" + module +
             " module to classpath) [component=" + this + ", cls=" + clsName + ']', err);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f0daea95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 9ae2972..3b77d48 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -638,6 +638,15 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         Collection<CacheContinuousQueryEntry> entries) {
         final GridCacheContext cctx = cacheContext(ctx);
 
+        if (cctx == null) {
+            IgniteLogger log = ctx.log(CacheContinuousQueryHandler.class);
+
+            if (log.isDebugEnabled())
+                log.debug("Failed to notify callback, cache is not found: " + cacheId);
+
+            return;
+        }
+
         final Collection<CacheEntryEvent<? extends K, ? extends V>> entries0
= new ArrayList<>(entries.size());
 
         for (CacheContinuousQueryEntry e : entries) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/f0daea95/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 05bb1e6..84400ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -162,7 +162,7 @@ import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusChe
  */
 class ServerImpl extends TcpDiscoveryImpl {
     /** */
-    private static final int ENSURED_MSG_HIST_SIZE = getInteger(IGNITE_DISCOVERY_HISTORY_SIZE,
1024 * 10);
+    private static final int ENSURED_MSG_HIST_SIZE = getInteger(IGNITE_DISCOVERY_HISTORY_SIZE,
1024);
 
     /** */
     private static final IgniteProductVersion CUSTOM_MSG_ALLOW_JOINING_FOR_VERIFIED_SINCE
=
@@ -1412,6 +1412,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                 ", topSize=" + ring.allNodes().size() +
                 ", leavingNodesSize=" + leavingNodesSize + ", failedNodesSize=" + failedNodesSize
+
                 ", msgWorker.queue.size=" + (msgWorker != null ? msgWorker.queueSize() :
"N/A") +
+                ", clients=" + ring.clientNodes().size() +
+                ", clientWorkers=" + clientMsgWorkers.size() +
                 ", lastUpdate=" + (locNode != null ? U.format(locNode.lastUpdateTime()) :
"N/A") +
                 ", heapFree=" + runtime.freeMemory() / (1024 * 1024) +
                 "M, heapTotal=" + runtime.maxMemory() / (1024 * 1024) + "M]");
@@ -2114,7 +2116,7 @@ class ServerImpl extends TcpDiscoveryImpl {
     /**
      * Message worker thread for messages processing.
      */
-    private class RingMessageWorker extends MessageWorkerAdapter {
+    private class RingMessageWorker extends MessageWorkerAdapter<TcpDiscoveryAbstractMessage>
{
         /** Next node. */
         @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
         private TcpDiscoveryNode next;
@@ -2172,6 +2174,32 @@ class ServerImpl extends TcpDiscoveryImpl {
             initConnectionCheckFrequency();
         }
 
+        /**
+         * Adds message to queue.
+         *
+         * @param msg Message to add.
+         */
+        void addMessage(TcpDiscoveryAbstractMessage msg) {
+            if ((msg instanceof TcpDiscoveryStatusCheckMessage ||
+                msg instanceof TcpDiscoveryJoinRequestMessage ||
+                msg instanceof TcpDiscoveryCustomEventMessage ||
+                msg instanceof TcpDiscoveryClientReconnectMessage) &&
+                queue.contains(msg)) {
+                if (log.isDebugEnabled())
+                    log.debug("Ignoring duplicate message: " + msg);
+
+                return;
+            }
+
+            if (msg.highPriority())
+                queue.addFirst(msg);
+            else
+                queue.add(msg);
+
+            if (log.isDebugEnabled())
+                log.debug("Message has been added to queue: " + msg);
+        }
+
         /** {@inheritDoc} */
         @Override protected void body() throws InterruptedException {
             try {
@@ -2277,9 +2305,6 @@ class ServerImpl extends TcpDiscoveryImpl {
             else if (msg instanceof TcpDiscoveryNodeFailedMessage)
                 processNodeFailedMessage((TcpDiscoveryNodeFailedMessage)msg);
 
-            else if (msg instanceof TcpDiscoveryClientHeartbeatMessage)
-                processClientHeartbeatMessage((TcpDiscoveryClientHeartbeatMessage)msg);
-
             else if (msg instanceof TcpDiscoveryHeartbeatMessage)
                 processHeartbeatMessage((TcpDiscoveryHeartbeatMessage)msg);
 
@@ -2333,28 +2358,44 @@ class ServerImpl extends TcpDiscoveryImpl {
          */
         private void sendMessageToClients(TcpDiscoveryAbstractMessage msg) {
             if (redirectToClients(msg)) {
-                byte[] marshalledMsg = null;
+                byte[] msgBytes = null;
 
                 for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) {
-                    // Send a clone to client to avoid ConcurrentModificationException
-                    TcpDiscoveryAbstractMessage msgClone;
-
-                    try {
-                        if (marshalledMsg == null)
-                            marshalledMsg = spi.marsh.marshal(msg);
+                    if (msgBytes == null) {
+                        try {
+                            msgBytes = spi.marsh.marshal(msg);
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.error(log, "Failed to marshal message: " + msg, e);
 
-                        msgClone = spi.marsh.unmarshal(marshalledMsg,
-                            U.resolveClassLoader(spi.ignite().configuration()));
+                            break;
+                        }
                     }
-                    catch (IgniteCheckedException e) {
-                        U.error(log, "Failed to marshal message: " + msg, e);
 
-                        msgClone = msg;
-                    }
+                    TcpDiscoveryAbstractMessage msg0 = msg;
+                    byte[] msgBytes0 = msgBytes;
 
-                    prepareNodeAddedMessage(msgClone, clientMsgWorker.clientNodeId, null,
null, null);
+                    if (msg instanceof TcpDiscoveryNodeAddedMessage) {
+                        TcpDiscoveryNodeAddedMessage nodeAddedMsg = (TcpDiscoveryNodeAddedMessage)msg;
+
+                        TcpDiscoveryNode node = nodeAddedMsg.node();
+
+                        if (clientMsgWorker.clientNodeId.equals(node.id())) {
+                            try {
+                                msg0 = spi.marsh.unmarshal(msgBytes,
+                                    U.resolveClassLoader(spi.ignite().configuration()));
+
+                                prepareNodeAddedMessage(msg0, clientMsgWorker.clientNodeId,
null, null, null);
 
-                    clientMsgWorker.addMessage(msgClone);
+                                msgBytes0 = null;
+                            }
+                            catch (IgniteCheckedException e) {
+                                U.error(log, "Failed to create message copy: " + msg, e);
+                            }
+                        }
+                    }
+
+                    clientMsgWorker.addMessage(msg0, msgBytes0);
                 }
             }
         }
@@ -4528,22 +4569,6 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
-         * Processes client heartbeat message.
-         *
-         * @param msg Heartbeat message.
-         */
-        private void processClientHeartbeatMessage(TcpDiscoveryClientHeartbeatMessage msg)
{
-            assert msg.client();
-
-            ClientMessageWorker wrk = clientMsgWorkers.get(msg.creatorNodeId());
-
-            if (wrk != null)
-                wrk.metrics(msg.metrics());
-            else if (log.isDebugEnabled())
-                log.debug("Received heartbeat message from unknown client node: " + msg);
-        }
-
-        /**
          * @param nodeId Node ID.
          * @param metrics Metrics.
          * @param cacheMetrics Cache metrics.
@@ -5448,7 +5473,12 @@ class ServerImpl extends TcpDiscoveryImpl {
                             continue;
                         }
 
-                        msgWorker.addMessage(msg);
+                        TcpDiscoveryClientHeartbeatMessage heartbeatMsg = null;
+
+                        if (msg instanceof TcpDiscoveryClientHeartbeatMessage)
+                            heartbeatMsg = (TcpDiscoveryClientHeartbeatMessage)msg;
+                        else
+                            msgWorker.addMessage(msg);
 
                         // Send receipt back.
                         if (clientMsgWrk != null) {
@@ -5460,6 +5490,9 @@ class ServerImpl extends TcpDiscoveryImpl {
                         }
                         else
                             spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
+
+                        if (heartbeatMsg != null)
+                            processClientHeartbeatMessage(heartbeatMsg);
                     }
                     catch (IgniteCheckedException e) {
                         if (log.isDebugEnabled())
@@ -5527,6 +5560,22 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
+         * Processes client heartbeat message.
+         *
+         * @param msg Heartbeat message.
+         */
+        private void processClientHeartbeatMessage(TcpDiscoveryClientHeartbeatMessage msg)
{
+            assert msg.client();
+
+            ClientMessageWorker wrk = clientMsgWorkers.get(msg.creatorNodeId());
+
+            if (wrk != null)
+                wrk.metrics(msg.metrics());
+            else if (log.isDebugEnabled())
+                log.debug("Received heartbeat message from unknown client node: " + msg);
+        }
+
+        /**
          * @param msg Join request message.
          * @param clientMsgWrk Client message worker to start.
          * @return Whether connection was successful.
@@ -5653,16 +5702,13 @@ class ServerImpl extends TcpDiscoveryImpl {
 
     /**
      */
-    private class ClientMessageWorker extends MessageWorkerAdapter {
+    private class ClientMessageWorker extends MessageWorkerAdapter<T2<TcpDiscoveryAbstractMessage,
byte[]>> {
         /** Node ID. */
         private final UUID clientNodeId;
 
         /** Socket. */
         private final Socket sock;
 
-        /** Output stream. */
-        private final OutputStream out;
-
         /** Current client metrics. */
         private volatile ClusterMetrics metrics;
 
@@ -5681,8 +5727,6 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             this.sock = sock;
             this.clientNodeId = clientNodeId;
-
-            out = new BufferedOutputStream(sock.getOutputStream(), sock.getSendBufferSize());
         }
 
         /**
@@ -5706,11 +5750,43 @@ class ServerImpl extends TcpDiscoveryImpl {
             this.metrics = metrics;
         }
 
+        /**
+         * @param msg Message.
+         */
+        public void addMessage(TcpDiscoveryAbstractMessage msg) {
+            addMessage(msg, null);
+        }
+
+        /**
+         * @param msg Message.
+         * @param msgBytes Optional message bytes.
+         */
+        public void addMessage(TcpDiscoveryAbstractMessage msg, @Nullable byte[] msgBytes)
{
+            T2 t = new T2<>(msg, msgBytes);
+
+            if (msg.highPriority())
+                queue.addFirst(t);
+            else
+                queue.add(t);
+
+            if (log.isDebugEnabled())
+                log.debug("Message has been added to client queue: " + msg);
+        }
+
         /** {@inheritDoc} */
-        @Override protected void processMessage(TcpDiscoveryAbstractMessage msg) {
+        @Override protected void processMessage(T2<TcpDiscoveryAbstractMessage, byte[]>
msgT) {
+            boolean success = false;
+
+            TcpDiscoveryAbstractMessage msg = msgT.get1();
+
             try {
                 assert msg.verified() : msg;
 
+                byte[] msgBytes = msgT.get2();
+
+                if (msgBytes == null)
+                    msgBytes = spi.marsh.marshal(msg);
+
                 if (msg instanceof TcpDiscoveryClientAckResponse) {
                     if (clientVer == null) {
                         ClusterNode node = spi.getNode(clientNodeId);
@@ -5729,7 +5805,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                             log.debug("Sending message ack to client [sock=" + sock + ",
locNodeId="
                                 + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg="
+ msg + ']');
 
-                        spi.writeToSocket(sock, out, msg, spi.failureDetectionTimeoutEnabled()
?
+                        spi.writeToSocket(sock, msg, msgBytes, spi.failureDetectionTimeoutEnabled()
?
                             spi.failureDetectionTimeout() : spi.getSocketTimeout());
                     }
                 }
@@ -5740,9 +5816,11 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     assert topologyInitialized(msg) : msg;
 
-                    spi.writeToSocket(sock, out, msg, spi.failureDetectionTimeoutEnabled()
?
+                    spi.writeToSocket(sock, msg, msgBytes, spi.failureDetectionTimeoutEnabled()
?
                         spi.failureDetectionTimeout() : spi.getSocketTimeout());
                 }
+
+                success = true;
             }
             catch (IgniteCheckedException | IOException e) {
                 if (log.isDebugEnabled())
@@ -5751,12 +5829,15 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 onException("Client connection failed [sock=" + sock + ", locNodeId="
                     + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg +
']', e);
+            }
+            finally {
+                if (!success) {
+                    clientMsgWorkers.remove(clientNodeId, this);
 
-                clientMsgWorkers.remove(clientNodeId, this);
-
-                U.interrupt(this);
+                    U.interrupt(this);
 
-                U.closeQuiet(sock);
+                    U.closeQuiet(sock);
+                }
             }
         }
 
@@ -5846,9 +5927,9 @@ class ServerImpl extends TcpDiscoveryImpl {
     /**
      * Base class for message workers.
      */
-    protected abstract class MessageWorkerAdapter extends IgniteSpiThread {
+    protected abstract class MessageWorkerAdapter<T> extends IgniteSpiThread {
         /** Message queue. */
-        private final BlockingDeque<TcpDiscoveryAbstractMessage> queue = new LinkedBlockingDeque<>();
+        protected final BlockingDeque<T> queue = new LinkedBlockingDeque<>();
 
         /** Backed interrupted flag. */
         private volatile boolean interrupted;
@@ -5874,7 +5955,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 log.debug("Message worker started [locNodeId=" + getConfiguredNodeId() +
']');
 
             while (!isInterrupted()) {
-                TcpDiscoveryAbstractMessage msg = queue.poll(pollingTimeout, TimeUnit.MILLISECONDS);
+                T msg = queue.poll(pollingTimeout, TimeUnit.MILLISECONDS);
 
                 if (msg == null)
                     noMessageLoop();
@@ -5903,34 +5984,9 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
-         * Adds message to queue.
-         *
-         * @param msg Message to add.
-         */
-        void addMessage(TcpDiscoveryAbstractMessage msg) {
-            if ((msg instanceof TcpDiscoveryStatusCheckMessage ||
-                msg instanceof TcpDiscoveryJoinRequestMessage ||
-                msg instanceof TcpDiscoveryCustomEventMessage) &&
-                queue.contains(msg)) {
-                if (log.isDebugEnabled())
-                    log.debug("Ignoring duplicate message: " + msg);
-
-                return;
-            }
-
-            if (msg.highPriority())
-                queue.addFirst(msg);
-            else
-                queue.add(msg);
-
-            if (log.isDebugEnabled())
-                log.debug("Message has been added to queue: " + msg);
-        }
-
-        /**
          * @param msg Message.
          */
-        protected abstract void processMessage(TcpDiscoveryAbstractMessage msg);
+        protected abstract void processMessage(T msg);
 
         /**
          * Called when there is no message to process giving ability to perform other activity.

http://git-wip-us.apache.org/repos/asf/ignite/blob/f0daea95/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 4351c64..c135b83 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1267,7 +1267,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi,
T
 
         sock.connect(resolved, (int)timeoutHelper.nextTimeoutChunk(sockTimeout));
 
-        writeToSocket(sock, U.IGNITE_HEADER, timeoutHelper.nextTimeoutChunk(sockTimeout));
+        writeToSocket(sock, null, U.IGNITE_HEADER, timeoutHelper.nextTimeoutChunk(sockTimeout));
 
         return sock;
     }
@@ -1297,12 +1297,13 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi,
T
      * Writes message to the socket.
      *
      * @param sock Socket.
+     * @param msg Message.
      * @param data Raw data to write.
      * @param timeout Socket write timeout.
      * @throws IOException If IO failed or write timed out.
      */
     @SuppressWarnings("ThrowFromFinallyBlock")
-    private void writeToSocket(Socket sock, byte[] data, long timeout) throws IOException
{
+    protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, byte[] data,
long timeout) throws IOException {
         assert sock != null;
         assert data != null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f0daea95/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java
index 37b578c..3993de0 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java
@@ -38,6 +38,7 @@ public class TcpDiscoveryClientHeartbeatMessage extends TcpDiscoveryAbstractMess
      * Constructor.
      *
      * @param creatorNodeId Creator node.
+     * @param metrics Metrics.
      */
     public TcpDiscoveryClientHeartbeatMessage(UUID creatorNodeId, ClusterMetrics metrics)
{
         super(creatorNodeId);

http://git-wip-us.apache.org/repos/asf/ignite/blob/f0daea95/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java
index 7c0cd5d..7cce78b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java
@@ -20,6 +20,7 @@ package org.apache.ignite.spi.discovery.tcp.messages;
 import java.util.Collection;
 import java.util.UUID;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
 
@@ -96,6 +97,21 @@ public class TcpDiscoveryClientReconnectMessage extends TcpDiscoveryAbstractMess
     }
 
     /** {@inheritDoc} */
+    @Override public boolean equals(Object obj) {
+        // NOTE!
+        // Do not call super. As IDs will differ, but we can ignore this.
+
+        if (!(obj instanceof TcpDiscoveryClientReconnectMessage))
+            return false;
+
+        TcpDiscoveryClientReconnectMessage other = (TcpDiscoveryClientReconnectMessage)obj;
+
+        return F.eq(creatorNodeId(), other.creatorNodeId()) &&
+            F.eq(routerNodeId, other.routerNodeId) &&
+            F.eq(lastMsgId, other.lastMsgId);
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(TcpDiscoveryClientReconnectMessage.class, this, "super", super.toString());
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f0daea95/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index e01094c..331b581 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -1736,8 +1736,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
         final AtomicBoolean err = new AtomicBoolean(false);
 
         client.events().localListen(new IgnitePredicate<Event>() {
-            @Override
-            public boolean apply(Event evt) {
+            @Override public boolean apply(Event evt) {
                 if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
                     log.info("Disconnected event.");
 
@@ -2158,17 +2157,49 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
         }
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage
msg,
+        @Override protected void writeToSocket(Socket sock,
+            OutputStream out,
+            TcpDiscoveryAbstractMessage msg,
             long timeout) throws IOException, IgniteCheckedException {
             waitFor(writeLock);
 
+            if (!onMessage(sock, msg))
+                return;
+
+            super.writeToSocket(sock, out, msg, timeout);
+
+            if (afterWrite != null)
+                afterWrite.apply(msg, sock);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
byte[] msgBytes,
+            long timeout) throws IOException {
+            waitFor(writeLock);
+
+            if (!onMessage(sock, msg))
+                return;
+
+            super.writeToSocket(sock, msg, msgBytes, timeout);
+
+            if (afterWrite != null)
+                afterWrite.apply(msg, sock);
+        }
+
+        /**
+         * @param sock Socket.
+         * @param msg Message.
+         * @return {@code False} if should not further process message.
+         * @throws IOException If failed.
+         */
+        private boolean onMessage(Socket sock, TcpDiscoveryAbstractMessage msg) throws IOException
{
             boolean fail = false;
 
             if (skipNodeAdded &&
                 (msg instanceof TcpDiscoveryNodeAddedMessage || msg instanceof TcpDiscoveryNodeAddFinishedMessage))
{
                 log.info("Skip message: " + msg);
 
-                return;
+                return false;
             }
 
             if (msg instanceof TcpDiscoveryNodeAddedMessage)
@@ -2184,10 +2215,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
                 sock.close();
             }
 
-            super.writeToSocket(sock, out, msg, timeout);
-
-            if (afterWrite != null)
-                afterWrite.apply(msg, sock);
+            return true;
         }
 
         /** {@inheritDoc} */


Mime
View raw message