ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [13/22] ignite git commit: fixed slow exchange on topology startup (cherry picked from commit c88182c)
Date Thu, 28 Apr 2016 11:28:14 GMT
fixed slow exchange on topology startup
(cherry picked from commit c88182c)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dc71125d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dc71125d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dc71125d

Branch: refs/heads/ignite-2523-1
Commit: dc71125d23c03528f324458f0c6a14f3c34ab3ab
Parents: 122b0f4
Author: Yakov Zhdanov <yzhdanov@gridgain.com>
Authored: Thu Apr 21 16:15:02 2016 +0300
Committer: Yakov Zhdanov <yzhdanov@gridgain.com>
Committed: Wed Apr 27 12:41:43 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/IgniteEventsImpl.java       |  11 +-
 .../ignite/internal/IgniteMessagingImpl.java    |   7 +-
 .../org/apache/ignite/internal/IgnitionEx.java  |   2 +-
 .../continuous/CacheContinuousQueryManager.java |   1 +
 .../continuous/GridContinuousProcessor.java     |  29 ++-
 .../ignite/spi/IgniteNodeValidationResult.java  |   8 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 194 +++++++++++--------
 .../tcp/internal/TcpDiscoveryNode.java          |   2 +-
 .../messages/TcpDiscoveryAbstractMessage.java   |   4 +-
 .../TcpDiscoveryCustomEventMessage.java         |  13 +-
 .../TcpDiscoveryJoinRequestMessage.java         |  16 +-
 .../TcpDiscoveryStatusCheckMessage.java         |  18 +-
 12 files changed, 208 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/dc71125d/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java
index 505bc9d..3c6218d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java
@@ -108,9 +108,16 @@ public class IgniteEventsImpl extends AsyncSupportAdapter<IgniteEvents>
implemen
         guard();
 
         try {
+            GridEventConsumeHandler hnd = new GridEventConsumeHandler((IgniteBiPredicate<UUID,
Event>)locLsnr,
+                (IgnitePredicate<Event>)rmtFilter, types);
+
             return saveOrGet(ctx.continuous().startRoutine(
-                new GridEventConsumeHandler((IgniteBiPredicate<UUID, Event>)locLsnr,
-                    (IgnitePredicate<Event>)rmtFilter, types), bufSize, interval, autoUnsubscribe,
prj.predicate()));
+                hnd,
+                false,
+                bufSize,
+                interval,
+                autoUnsubscribe,
+                prj.predicate()));
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc71125d/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
index 17c06fc..2800777 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
@@ -184,7 +184,12 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter<IgniteMessaging>
         try {
             GridContinuousHandler hnd = new GridMessageListenHandler(topic, (IgniteBiPredicate<UUID,
Object>)p);
 
-            return saveOrGet(ctx.continuous().startRoutine(hnd, 1, 0, false, prj.predicate()));
+            return saveOrGet(ctx.continuous().startRoutine(hnd,
+                false,
+                1,
+                0,
+                false,
+                prj.predicate()));
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc71125d/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index f47b7ff..24249da 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1278,7 +1278,7 @@ public class IgnitionEx {
     }
 
     /**
-     * Gets a name of the grid, which is owner of current thread. An Exception is thrown
if
+     * Gets the grid, which is owner of current thread. An Exception is thrown if
      * current thread is not an {@link IgniteThread}.
      *
      * @return Grid instance related to current thread

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc71125d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 9efc456..fafb830 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -641,6 +641,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
 
         UUID id = cctx.kernalContext().continuous().startRoutine(
             hnd,
+            internal && loc,
             bufSize,
             timeInterval,
             autoUnsubscribe,

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc71125d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index d7838f3..fd5e446 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -540,11 +540,13 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
      * @param bufSize Buffer size.
      * @param interval Time interval.
      * @param autoUnsubscribe Automatic unsubscribe flag.
+     * @param locOnly Local only flag.
      * @param prjPred Projection predicate.
      * @return Future.
      */
     @SuppressWarnings("TooBroadScope")
     public IgniteInternalFuture<UUID> startRoutine(GridContinuousHandler hnd,
+        boolean locOnly,
         int bufSize,
         long interval,
         boolean autoUnsubscribe,
@@ -553,12 +555,30 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         assert bufSize > 0;
         assert interval >= 0;
 
-        // Whether local node is included in routine.
-        boolean locIncluded = prjPred == null || prjPred.apply(ctx.discovery().localNode());
-
         // Generate ID.
         final UUID routineId = UUID.randomUUID();
 
+        // Register routine locally.
+        locInfos.put(routineId, new LocalRoutineInfo(prjPred, hnd, bufSize, interval, autoUnsubscribe));
+
+        if (locOnly) {
+            try {
+                registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, interval, autoUnsubscribe,
true);
+
+                hnd.onListenerRegistered(routineId, ctx);
+
+                return new GridFinishedFuture<>(routineId);
+            }
+            catch (IgniteCheckedException e) {
+                unregisterHandler(routineId, hnd, true);
+
+                return new GridFinishedFuture<>(e);
+            }
+        }
+
+        // Whether local node is included in routine.
+        boolean locIncluded = prjPred == null || prjPred.apply(ctx.discovery().localNode());
+
         StartRequestData reqData = new StartRequestData(prjPred, hnd.clone(), bufSize, interval,
autoUnsubscribe);
 
         try {
@@ -613,9 +633,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             });
         }
 
-        // Register routine locally.
-        locInfos.put(routineId, new LocalRoutineInfo(prjPred, hnd, bufSize, interval, autoUnsubscribe));
-
         StartFuture fut = new StartFuture(ctx, routineId);
 
         startFuts.put(routineId, fut);

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc71125d/modules/core/src/main/java/org/apache/ignite/spi/IgniteNodeValidationResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteNodeValidationResult.java
b/modules/core/src/main/java/org/apache/ignite/spi/IgniteNodeValidationResult.java
index 2473a9e..3dd4caf 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteNodeValidationResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteNodeValidationResult.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.spi;
 
 import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
  * Result of joining node validation.
@@ -63,4 +64,9 @@ public class IgniteNodeValidationResult {
     public String sendMessage() {
         return sndMsg;
     }
-}
\ No newline at end of file
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgniteNodeValidationResult.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc71125d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 3283d99..450f628 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -2584,6 +2584,9 @@ class ServerImpl extends TcpDiscoveryImpl {
                             }
                             finally {
                                 if (!success) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Closing socket to next: " + next);
+
                                     U.closeQuiet(sock);
 
                                     sock = null;
@@ -2747,6 +2750,9 @@ class ServerImpl extends TcpDiscoveryImpl {
                             forceSndPending = false;
 
                             if (!sent) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Closing socket to next (not sent): " + next);
+
                                 U.closeQuiet(sock);
 
                                 sock = null;
@@ -2897,12 +2903,12 @@ class ServerImpl extends TcpDiscoveryImpl {
          *
          * @param msg Join request message.
          */
-        private void processJoinRequestMessage(TcpDiscoveryJoinRequestMessage msg) {
+        private void processJoinRequestMessage(final TcpDiscoveryJoinRequestMessage msg)
{
             assert msg != null;
 
-            TcpDiscoveryNode node = msg.node();
+            final TcpDiscoveryNode node = msg.node();
 
-            UUID locNodeId = getLocalNodeId();
+            final UUID locNodeId = getLocalNodeId();
 
             if (!msg.client()) {
                 boolean rmtHostLoopback = node.socketAddresses().size() == 1 &&
@@ -3110,92 +3116,107 @@ class ServerImpl extends TcpDiscoveryImpl {
                     }
                 }
 
-                IgniteNodeValidationResult err = spi.getSpiContext().validateNode(node);
+                final IgniteNodeValidationResult err = spi.getSpiContext().validateNode(node);
 
                 if (err != null) {
-                    boolean ping = node.id().equals(err.nodeId()) ? pingNode(node) : pingNode(err.nodeId());
+                    if (log.isDebugEnabled())
+                        log.debug("Node validation failed [res=" + err + ", node=" + node
+ ']');
 
-                    if (!ping) {
-                        if (log.isDebugEnabled())
-                            log.debug("Conflicting node has already left, need to wait for
event. " +
-                                "Will ignore join request for now since it will be recent
[req=" + msg +
-                                ", err=" + err.message() + ']');
+                    utilityPool.submit(
+                        new Runnable() {
+                            @Override public void run() {
+                                boolean ping = node.id().equals(err.nodeId()) ? pingNode(node)
: pingNode(err.nodeId());
 
-                        // Ignore join request.
-                        return;
-                    }
+                                if (!ping) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Conflicting node has already left, need
to wait for event. " +
+                                            "Will ignore join request for now since it will
be recent [req=" + msg +
+                                            ", err=" + err.message() + ']');
 
-                    LT.warn(log, null, err.message());
+                                    // Ignore join request.
+                                    return;
+                                }
 
-                    // Always output in debug.
-                    if (log.isDebugEnabled())
-                        log.debug(err.message());
+                                LT.warn(log, null, err.message());
 
-                    try {
-                        trySendMessageDirectly(node,
-                            new TcpDiscoveryCheckFailedMessage(locNodeId, err.sendMessage()));
-                    }
-                    catch (IgniteSpiException e) {
-                        if (log.isDebugEnabled())
-                            log.debug("Failed to send hash ID resolver validation failed
message to node " +
-                                "[node=" + node + ", err=" + e.getMessage() + ']');
+                                // Always output in debug.
+                                if (log.isDebugEnabled())
+                                    log.debug(err.message());
 
-                        onException("Failed to send hash ID resolver validation failed message
to node " +
-                            "[node=" + node + ", err=" + e.getMessage() + ']', e);
-                    }
+                                try {
+                                    trySendMessageDirectly(node,
+                                        new TcpDiscoveryCheckFailedMessage(locNodeId, err.sendMessage()));
+                                }
+                                catch (IgniteSpiException e) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Failed to send hash ID resolver validation
failed message to node " +
+                                            "[node=" + node + ", err=" + e.getMessage() +
']');
+
+                                    onException("Failed to send hash ID resolver validation
failed message to node " +
+                                        "[node=" + node + ", err=" + e.getMessage() + ']',
e);
+                                }
+                            }
+                        }
+                    );
 
                     // Ignore join request.
                     return;
                 }
 
-                String locMarsh = locNode.attribute(ATTR_MARSHALLER);
-                String rmtMarsh = node.attribute(ATTR_MARSHALLER);
+                final String locMarsh = locNode.attribute(ATTR_MARSHALLER);
+                final String rmtMarsh = node.attribute(ATTR_MARSHALLER);
 
                 if (!F.eq(locMarsh, rmtMarsh)) {
-                    String errMsg = "Local node's marshaller differs from remote node's marshaller
" +
-                        "(to make sure all nodes in topology have identical marshaller, "
+
-                        "configure marshaller explicitly in configuration) " +
-                        "[locMarshaller=" + locMarsh + ", rmtMarshaller=" + rmtMarsh +
-                        ", locNodeAddrs=" + U.addressesAsString(locNode) +
-                        ", rmtNodeAddrs=" + U.addressesAsString(node) +
-                        ", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId()
+ ']';
+                    utilityPool.submit(
+                        new Runnable() {
+                            @Override public void run() {
+                                String errMsg = "Local node's marshaller differs from remote
node's marshaller " +
+                                    "(to make sure all nodes in topology have identical marshaller,
" +
+                                    "configure marshaller explicitly in configuration) "
+
+                                    "[locMarshaller=" + locMarsh + ", rmtMarshaller=" + rmtMarsh
+
+                                    ", locNodeAddrs=" + U.addressesAsString(locNode) +
+                                    ", rmtNodeAddrs=" + U.addressesAsString(node) +
+                                    ", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId()
+ ']';
 
-                    LT.warn(log, null, errMsg);
+                                LT.warn(log, null, errMsg);
 
-                    // Always output in debug.
-                    if (log.isDebugEnabled())
-                        log.debug(errMsg);
+                                // Always output in debug.
+                                if (log.isDebugEnabled())
+                                    log.debug(errMsg);
 
-                    try {
-                        String sndMsg = "Local node's marshaller differs from remote node's
marshaller " +
-                            "(to make sure all nodes in topology have identical marshaller,
" +
-                            "configure marshaller explicitly in configuration) " +
-                            "[locMarshaller=" + rmtMarsh + ", rmtMarshaller=" + locMarsh
+
-                            ", locNodeAddrs=" + U.addressesAsString(node) + ", locPort="
+ node.discoveryPort() +
-                            ", rmtNodeAddr=" + U.addressesAsString(locNode) + ", locNodeId="
+ node.id() +
-                            ", rmtNodeId=" + locNode.id() + ']';
-
-                        trySendMessageDirectly(node,
-                            new TcpDiscoveryCheckFailedMessage(locNodeId, sndMsg));
-                    }
-                    catch (IgniteSpiException e) {
-                        if (log.isDebugEnabled())
-                            log.debug("Failed to send marshaller check failed message to
node " +
-                                "[node=" + node + ", err=" + e.getMessage() + ']');
+                                try {
+                                    String sndMsg = "Local node's marshaller differs from
remote node's marshaller " +
+                                        "(to make sure all nodes in topology have identical
marshaller, " +
+                                        "configure marshaller explicitly in configuration)
" +
+                                        "[locMarshaller=" + rmtMarsh + ", rmtMarshaller="
+ locMarsh +
+                                        ", locNodeAddrs=" + U.addressesAsString(node) + ",
locPort=" + node.discoveryPort() +
+                                        ", rmtNodeAddr=" + U.addressesAsString(locNode) +
", locNodeId=" + node.id() +
+                                        ", rmtNodeId=" + locNode.id() + ']';
+
+                                    trySendMessageDirectly(node,
+                                        new TcpDiscoveryCheckFailedMessage(locNodeId, sndMsg));
+                                }
+                                catch (IgniteSpiException e) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Failed to send marshaller check failed
message to node " +
+                                            "[node=" + node + ", err=" + e.getMessage() +
']');
 
-                        onException("Failed to send marshaller check failed message to node
" +
-                            "[node=" + node + ", err=" + e.getMessage() + ']', e);
-                    }
+                                    onException("Failed to send marshaller check failed message
to node " +
+                                        "[node=" + node + ", err=" + e.getMessage() + ']',
e);
+                                }
+                            }
+                        }
+                    );
 
                     // Ignore join request.
                     return;
                 }
 
                 // If node have no value for this attribute then we treat it as true.
-                Boolean locMarshUseDfltSuid = locNode.attribute(ATTR_MARSHALLER_USE_DFLT_SUID);
+                final Boolean locMarshUseDfltSuid = locNode.attribute(ATTR_MARSHALLER_USE_DFLT_SUID);
                 boolean locMarshUseDfltSuidBool = locMarshUseDfltSuid == null ? true : locMarshUseDfltSuid;
 
-                Boolean rmtMarshUseDfltSuid = node.attribute(ATTR_MARSHALLER_USE_DFLT_SUID);
+                final Boolean rmtMarshUseDfltSuid = node.attribute(ATTR_MARSHALLER_USE_DFLT_SUID);
                 boolean rmtMarshUseDfltSuidBool = rmtMarshUseDfltSuid == null ? true : rmtMarshUseDfltSuid;
 
                 Boolean locLateAssign = locNode.attribute(ATTR_LATE_AFFINITY_ASSIGNMENT);
@@ -3203,14 +3224,16 @@ class ServerImpl extends TcpDiscoveryImpl {
                 boolean locLateAssignBool = locLateAssign != null ? locLateAssign : false;
 
                 if (locMarshUseDfltSuidBool != rmtMarshUseDfltSuidBool) {
-                    String errMsg = "Local node's " + IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID
+
-                        " property value differs from remote node's value " +
-                        "(to make sure all nodes in topology have identical marshaller settings,
" +
-                        "configure system property explicitly) " +
-                        "[locMarshUseDfltSuid=" + locMarshUseDfltSuid + ", rmtMarshUseDfltSuid="
+ rmtMarshUseDfltSuid +
-                        ", locNodeAddrs=" + U.addressesAsString(locNode) +
-                        ", rmtNodeAddrs=" + U.addressesAsString(node) +
-                        ", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId()
+ ']';
+                    utilityPool.submit(new Runnable() {
+                        @Override public void run() {
+                            String errMsg = "Local node's " + IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID
+
+                                " property value differs from remote node's value " +
+                                "(to make sure all nodes in topology have identical marshaller
settings, " +
+                                "configure system property explicitly) " +
+                                "[locMarshUseDfltSuid=" + locMarshUseDfltSuid + ", rmtMarshUseDfltSuid="
+ rmtMarshUseDfltSuid +
+                                ", locNodeAddrs=" + U.addressesAsString(locNode) +
+                                ", rmtNodeAddrs=" + U.addressesAsString(node) +
+                                ", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId()
+ ']';
 
                     String sndMsg = "Local node's " + IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID
+
                         " property value differs from remote node's value " +
@@ -3230,19 +3253,22 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 // Validate compact footer flags.
                 Boolean locMarshCompactFooter = locNode.attribute(ATTR_MARSHALLER_COMPACT_FOOTER);
-                boolean locMarshCompactFooterBool = locMarshCompactFooter != null ? locMarshCompactFooter
: false;
+                final boolean locMarshCompactFooterBool = locMarshCompactFooter != null ?
locMarshCompactFooter : false;
 
                 Boolean rmtMarshCompactFooter = node.attribute(ATTR_MARSHALLER_COMPACT_FOOTER);
-                boolean rmtMarshCompactFooterBool = rmtMarshCompactFooter != null ? rmtMarshCompactFooter
: false;
+                final boolean rmtMarshCompactFooterBool = rmtMarshCompactFooter != null ?
rmtMarshCompactFooter : false;
 
                 if (locMarshCompactFooterBool != rmtMarshCompactFooterBool) {
-                    String errMsg = "Local node's binary marshaller \"compactFooter\" property
differs from " +
-                        "the same property on remote node (make sure all nodes in topology
have the same value " +
-                        "of \"compactFooter\" property) [locMarshallerCompactFooter=" + locMarshCompactFooterBool
+
-                        ", rmtMarshallerCompactFooter=" + rmtMarshCompactFooterBool +
-                        ", locNodeAddrs=" + U.addressesAsString(locNode) +
-                        ", rmtNodeAddrs=" + U.addressesAsString(node) +
-                        ", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId()
+ ']';
+                    utilityPool.submit(
+                        new Runnable() {
+                            @Override public void run() {
+                                String errMsg = "Local node's binary marshaller \"compactFooter\"
property differs from " +
+                                    "the same property on remote node (make sure all nodes
in topology have the same value " +
+                                    "of \"compactFooter\" property) [locMarshallerCompactFooter="
+ locMarshCompactFooterBool +
+                                    ", rmtMarshallerCompactFooter=" + rmtMarshCompactFooterBool
+
+                                    ", locNodeAddrs=" + U.addressesAsString(locNode) +
+                                    ", rmtNodeAddrs=" + U.addressesAsString(node) +
+                                    ", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId()
+ ']';
 
                     String sndMsg = "Local node's binary marshaller \"compactFooter\" property
differs from " +
                         "the same property on remote node (make sure all nodes in topology
have the same value " +
@@ -5871,6 +5897,16 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param msg Message to add.
          */
         void addMessage(TcpDiscoveryAbstractMessage msg) {
+            if ((msg instanceof TcpDiscoveryStatusCheckMessage ||
+                msg instanceof TcpDiscoveryJoinRequestMessage ||
+                msg instanceof TcpDiscoveryCustomEventMessage) &&
+                queue.contains(msg)) {
+                if (log.isDebugEnabled())
+                    log.debug("Ignoring duplicate message: " + msg);
+
+                return;
+            }
+
             if (msg.highPriority())
                 queue.addFirst(msg);
             else

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc71125d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
index d1fbecf..307aefe 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
@@ -645,4 +645,4 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements
Cluste
     @Override public String toString() {
         return S.toString(TcpDiscoveryNode.class, this, "isClient", isClient());
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc71125d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
index 9cb47af..24f2a5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
@@ -274,7 +274,7 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable
{
     }
 
     /** {@inheritDoc} */
-    @Override public final boolean equals(Object obj) {
+    @Override public boolean equals(Object obj) {
         if (this == obj)
             return true;
         else if (obj instanceof TcpDiscoveryAbstractMessage)
@@ -292,4 +292,4 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable
{
     @Override public String toString() {
         return S.toString(TcpDiscoveryAbstractMessage.class, this, "isClient", getFlag(CLIENT_FLAG_POS));
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc71125d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
index 2c759a1..c0e39d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
@@ -18,8 +18,8 @@
 package org.apache.ignite.spi.discovery.tcp.messages;
 
 import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
 import org.jetbrains.annotations.NotNull;
@@ -86,7 +86,16 @@ public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractMessage
     }
 
     /** {@inheritDoc} */
+    @Override public boolean equals(Object obj) {
+        return super.equals(obj) &&
+            obj instanceof TcpDiscoveryCustomEventMessage &&
+            F.eq(
+                ((TcpDiscoveryCustomEventMessage)obj).verifierNodeId(),
+                verifierNodeId());
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(TcpDiscoveryCustomEventMessage.class, this, "super", super.toString());
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc71125d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
index 2586a8b..22ffae8 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.spi.discovery.tcp.messages;
 
 import java.util.Map;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
 
@@ -79,7 +80,20 @@ public class TcpDiscoveryJoinRequestMessage extends TcpDiscoveryAbstractMessage
     }
 
     /** {@inheritDoc} */
+    @Override public boolean equals(Object obj) {
+        // NOTE!
+        // Do not call super. As IDs will differ, but we can ignore this.
+
+        if (!(obj instanceof TcpDiscoveryJoinRequestMessage))
+            return false;
+
+        TcpDiscoveryJoinRequestMessage other = (TcpDiscoveryJoinRequestMessage)obj;
+
+        return F.eqNodes(other.node, node);
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(TcpDiscoveryJoinRequestMessage.class, this, "super", super.toString());
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc71125d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java
index 70b0080..fdbeb75 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.spi.discovery.tcp.messages;
 
 import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
 
@@ -109,7 +110,22 @@ public class TcpDiscoveryStatusCheckMessage extends TcpDiscoveryAbstractMessage
     }
 
     /** {@inheritDoc} */
+    @Override public boolean equals(Object obj) {
+        // NOTE!
+        // Do not call super. As IDs will differ, but we can ignore this.
+
+        if (!(obj instanceof TcpDiscoveryStatusCheckMessage))
+            return false;
+
+        TcpDiscoveryStatusCheckMessage other = (TcpDiscoveryStatusCheckMessage)obj;
+
+        return F.eqNodes(other.creatorNode, creatorNode) &&
+            F.eq(other.failedNodeId, failedNodeId) &&
+            status == other.status;
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(TcpDiscoveryStatusCheckMessage.class, this, "super", super.toString());
     }
-}
\ No newline at end of file
+}


Mime
View raw message