ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [21/22] ignite git commit: TcoDiscovery: reduced amount of debug logging (heartbeat/connection check messages are logged trace level).
Date Mon, 31 Oct 2016 13:48:22 GMT
TcoDiscovery: reduced amount of debug logging (heartbeat/connection check messages are logged
trace level).


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6b78ad0c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6b78ad0c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6b78ad0c

Branch: refs/heads/ignite-4154
Commit: 6b78ad0cbbcf286cb083136c49cebd5dd85de58c
Parents: 6f16072
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Oct 31 10:35:44 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Oct 31 10:35:44 2016 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      |  10 +-
 .../processors/cache/GridCacheUtils.java        |  67 --------
 .../ignite/spi/discovery/tcp/ClientImpl.java    |  12 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 164 +++++++++++++------
 .../spi/discovery/tcp/TcpDiscoveryImpl.java     |  59 ++++++-
 .../messages/TcpDiscoveryAbstractMessage.java   |   7 +
 .../messages/TcpDiscoveryClientAckResponse.java |   5 +
 .../TcpDiscoveryClientHeartbeatMessage.java     |   7 +-
 .../TcpDiscoveryConnectionCheckMessage.java     |   5 +
 .../messages/TcpDiscoveryHeartbeatMessage.java  |   5 +
 10 files changed, 211 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6b78ad0c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 4eb61e3..a901e2a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -718,7 +718,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     /**
      * Partition refresh callback.
      */
-    void refreshPartitions() {
+    private void refreshPartitions() {
         ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, AffinityTopologyVersion.NONE);
 
         if (oldest == null) {
@@ -735,7 +735,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
         // If this is the oldest node.
         if (oldest.id().equals(cctx.localNodeId())) {
-            rmts = CU.remoteNodes(cctx, AffinityTopologyVersion.NONE);
+            GridDhtPartitionsExchangeFuture lastFut = lastInitializedFut;
+
+            // No need to send to nodes which did not finish their first exchange.
+            AffinityTopologyVersion rmtTopVer =
+                lastFut != null ? lastFut.topologyVersion() : AffinityTopologyVersion.NONE;
+
+            rmts = CU.remoteNodes(cctx, rmtTopVer);
 
             if (log.isDebugEnabled())
                 log.debug("Refreshing partitions from oldest node: " + cctx.localNodeId());

http://git-wip-us.apache.org/repos/asf/ignite/blob/6b78ad0c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 4c18f21..90e428c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -410,23 +410,6 @@ public class GridCacheUtils {
     }
 
     /**
-     * Gets public cache name substituting null name by {@code 'default'}.
-     *
-     * @return Public cache name substituting null name by {@code 'default'}.
-     */
-    public static String namexx(@Nullable String name) {
-        return name == null ? "default" : name;
-    }
-
-    /**
-     * @return Partition to state transformer.
-     */
-    @SuppressWarnings({"unchecked"})
-    public static IgniteClosure<GridDhtLocalPartition, GridDhtPartitionState> part2state()
{
-        return PART2STATE;
-    }
-
-    /**
      * Gets all nodes on which cache with the same name is started.
      *
      * @param ctx Cache context.
@@ -462,18 +445,6 @@ public class GridCacheUtils {
     }
 
     /**
-     * Gets alive remote nodes with at least one cache configured.
-     *
-     * @param ctx Cache context.
-     * @param topOrder Maximum allowed node order.
-     * @return Affinity nodes.
-     */
-    public static Collection<ClusterNode> aliveRemoteServerNodesWithCaches(final GridCacheSharedContext
ctx,
-        AffinityTopologyVersion topOrder) {
-        return ctx.discovery().aliveRemoteServerNodesWithCaches(topOrder);
-    }
-
-    /**
      * Gets all nodes on which cache with the same name is started and the local DHT storage
is enabled.
      *
      * @param ctx Cache context.
@@ -644,44 +615,6 @@ public class GridCacheUtils {
     }
 
     /**
-     * Gets type filter for projections.
-     *
-     * @param keyType Key type.
-     * @param valType Value type.
-     * @param <K> Key type.
-     * @param <V> Value type.
-     * @return Type filter.
-     */
-    public static <K, V> IgniteBiPredicate<K, V> typeFilter(final Class<?>
keyType, final Class<?> valType) {
-        return new P2<K, V>() {
-            @Override public boolean apply(K k, V v) {
-                return keyType.isAssignableFrom(k.getClass()) && valType.isAssignableFrom(v.getClass());
-            }
-
-            @Override public String toString() {
-                return "Type filter [keyType=" + keyType + ", valType=" + valType + ']';
-            }
-        };
-    }
-
-    /**
-     * @param keyType Key type.
-     * @param valType Value type.
-     * @return Type filter.
-     */
-    public static CacheEntryPredicate typeFilter0(final Class<?> keyType, final Class<?>
valType) {
-        return new CacheEntrySerializablePredicate(new CacheEntryPredicateAdapter() {
-            @Override public boolean apply(GridCacheEntryEx e) {
-                Object val = CU.value(peekVisibleValue(e), e.context(), false);
-
-                return val == null ||
-                    valType.isAssignableFrom(val.getClass()) &&
-                    keyType.isAssignableFrom(e.key().value(e.context().cacheObjectContext(),
false).getClass());
-            }
-        });
-    }
-
-    /**
      * @return Boolean reducer.
      */
     public static IgniteReducer<Boolean, Boolean> boolReducer() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/6b78ad0c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 2d948da..f929121 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -929,8 +929,10 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                         msg.senderNodeId(rmtNodeId);
 
-                        if (log.isDebugEnabled())
-                            log.debug("Message has been received: " + msg);
+                        DebugLogger debugLog = messageLogger(msg);
+
+                        if (debugLog.isDebugEnabled())
+                            debugLog.debug("Message has been received: " + msg);
 
                         spi.stats.onMessageReceived(msg);
 
@@ -2079,6 +2081,8 @@ class ClientImpl extends TcpDiscoveryImpl {
             @Nullable DiscoverySpiCustomMessage data) {
             DiscoverySpiListener lsnr = spi.lsnr;
 
+            DebugLogger log = type == EVT_NODE_METRICS_UPDATED ? traceLog : debugLog;
+
             if (lsnr != null) {
                 if (log.isDebugEnabled())
                     log.debug("Discovery notification [node=" + node + ", type=" + U.gridEventName(type)
+
@@ -2094,14 +2098,14 @@ class ClientImpl extends TcpDiscoveryImpl {
         /**
          * @param msg Message.
          */
-        public void addMessage(Object msg) {
+        void addMessage(Object msg) {
             queue.add(msg);
         }
 
         /**
          * @return Queue size.
          */
-        public int queueSize() {
+        int queueSize() {
             return queue.size();
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6b78ad0c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 78a5f39..55e5c89 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -214,6 +214,12 @@ class ServerImpl extends TcpDiscoveryImpl {
     /** Leaving nodes (but still in topology). */
     private final Collection<TcpDiscoveryNode> leavingNodes = new HashSet<>();
 
+    /** Collection to track joining nodes. */
+    private Set<UUID> joiningNodes = new HashSet<>();
+
+    /** Pending custom messages that should not be sent between NodeAdded and NodeAddFinished
messages. */
+    private Queue<TcpDiscoveryCustomEventMessage> pendingCustomMsgs = new ArrayDeque<>();
+
     /** If non-shared IP finder is used this flag shows whether IP finder contains local
address. */
     private boolean ipFinderHasLocAddr;
 
@@ -296,7 +302,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 throw new IgniteSpiException("Info log level should be enabled for TCP discovery
to work " +
                     "in debug mode.");
 
-            debugLog = new ConcurrentLinkedDeque<>();
+            debugLogQ = new ConcurrentLinkedDeque<>();
 
             U.quietAndWarn(log, "TCP discovery SPI is configured in debug mode.");
         }
@@ -1288,6 +1294,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
         TcpDiscoverySpiState spiState = spiStateCopy();
 
+        DebugLogger log = type == EVT_NODE_METRICS_UPDATED ? traceLog : debugLog;
+
         if (lsnr != null && node.visible() && (spiState == CONNECTED || spiState
== DISCONNECTING)) {
             if (log.isDebugEnabled())
                 log.debug("Discovery notification [node=" + node + ", spiState=" + spiState
+
@@ -1410,10 +1418,14 @@ class ServerImpl extends TcpDiscoveryImpl {
         if (log.isInfoEnabled() && spi.statsPrintFreq > 0) {
             int failedNodesSize;
             int leavingNodesSize;
+            int joiningNodesSize;
+            int pendingCustomMsgsSize;
 
             synchronized (mux) {
                 failedNodesSize = failedNodes.size();
                 leavingNodesSize = leavingNodes.size();
+                joiningNodesSize = joiningNodes.size();
+                pendingCustomMsgsSize = pendingCustomMsgs.size();
             }
 
             Runtime runtime = Runtime.getRuntime();
@@ -1422,8 +1434,13 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             log.info("Discovery SPI statistics [statistics=" + spi.stats + ", spiState="
+ spiStateCopy() +
                 ", coord=" + coord +
+                ", next=" + (msgWorker != null ? msgWorker.next : "N/A") +
+                ", intOrder=" + (locNode != null ? locNode.internalOrder() : "N/A") +
                 ", topSize=" + ring.allNodes().size() +
-                ", leavingNodesSize=" + leavingNodesSize + ", failedNodesSize=" + failedNodesSize
+
+                ", leavingNodesSize=" + leavingNodesSize +
+                ", failedNodesSize=" + failedNodesSize +
+                ", joiningNodesSize=" + joiningNodesSize +
+                ", pendingCustomMsgs=" + pendingCustomMsgsSize +
                 ", msgWorker.queue.size=" + (msgWorker != null ? msgWorker.queueSize() :
"N/A") +
                 ", clients=" + ring.clientNodes().size() +
                 ", clientWorkers=" + clientMsgWorkers.size() +
@@ -1612,7 +1629,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             b.append("In-memory log messages: ").append(U.nl());
 
-            for (String msg : debugLog)
+            for (String msg : debugLogQ)
                 b.append("    ").append(msg).append(U.nl());
 
             b.append(U.nl());
@@ -2177,12 +2194,6 @@ class ServerImpl extends TcpDiscoveryImpl {
         /** Connection check threshold. */
         private long connCheckThreshold;
 
-        /** Pending custom messages that should not be sent between NodeAdded and NodeAddFinished
messages. */
-        private Queue<TcpDiscoveryCustomEventMessage> pendingCustomMsgs = new ArrayDeque<>();
-
-        /** Collection to track joining nodes. */
-        private Set<UUID> joiningNodes = new HashSet<>();
-
         /**
          */
         protected RingMessageWorker() {
@@ -2197,6 +2208,8 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param msg Message to add.
          */
         void addMessage(TcpDiscoveryAbstractMessage msg) {
+            DebugLogger log = messageLogger(msg);
+
             if ((msg instanceof TcpDiscoveryStatusCheckMessage ||
                 msg instanceof TcpDiscoveryJoinRequestMessage ||
                 msg instanceof TcpDiscoveryCustomEventMessage ||
@@ -2278,6 +2291,8 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param msg Message to process.
          */
         @Override protected void processMessage(TcpDiscoveryAbstractMessage msg) {
+            DebugLogger log = messageLogger(msg);
+
             if (log.isDebugEnabled())
                 log.debug("Processing message [cls=" + msg.getClass().getSimpleName() + ",
id=" + msg.id() + ']');
 
@@ -2285,12 +2300,12 @@ class ServerImpl extends TcpDiscoveryImpl {
                 debugLog(msg, "Processing message [cls=" + msg.getClass().getSimpleName()
+ ", id=" + msg.id() + ']');
 
             if (locNode.internalOrder() == 0) {
-                boolean process = false;
+                boolean proc = false;
 
                 if (msg instanceof TcpDiscoveryNodeAddedMessage)
-                    process = ((TcpDiscoveryNodeAddedMessage)msg).node().equals(locNode);
+                    proc = ((TcpDiscoveryNodeAddedMessage)msg).node().equals(locNode);
 
-                if (!process) {
+                if (!proc) {
                     if (log.isDebugEnabled()) {
                         log.debug("Ignore message, local node order is not initialized [msg="
+ msg +
                             ", locNode=" + locNode + ']');
@@ -2488,8 +2503,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     newNextNode = true;
                 }
-                else if (log.isDebugEnabled())
-                    log.debug("Next node remains the same [nextId=" + next.id() +
+                else if (log.isTraceEnabled())
+                    log.trace("Next node remains the same [nextId=" + next.id() +
                         ", nextOrder=" + next.internalOrder() + ']');
 
                 // Flag that shows whether next node exists and accepts incoming connections.
@@ -2752,8 +2767,10 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                                 onMessageExchanged();
 
-                                if (log.isDebugEnabled()) {
-                                    log.debug("Message has been sent to next node [msg="
+ msg +
+                                DebugLogger debugLog = messageLogger(msg);
+
+                                if (debugLog.isDebugEnabled()) {
+                                    debugLog.debug("Message has been sent to next node [msg="
+ msg +
                                         ", next=" + next.id() +
                                         ", res=" + res + ']');
                                 }
@@ -3783,7 +3800,9 @@ class ServerImpl extends TcpDiscoveryImpl {
                     return;
                 }
 
-                joiningNodes.add(node.id());
+                synchronized (mux) {
+                    joiningNodes.add(node.id());
+                }
 
                 if (!isLocalNodeCoordinator() && spi.nodeAuth != null &&
spi.nodeAuth.isGlobalNodeAuthentication()) {
                     boolean authFailed = true;
@@ -3895,7 +3914,9 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 n.visible(true);
                             }
 
-                            joiningNodes.clear();
+                            synchronized (mux) {
+                                joiningNodes.clear();
+                            }
 
                             locNode.setAttributes(node.attributes());
 
@@ -4021,7 +4042,9 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
             }
 
-            joiningNodes.remove(nodeId);
+            synchronized (mux) {
+                joiningNodes.remove(nodeId);
+            }
 
             TcpDiscoverySpiState state = spiStateCopy();
 
@@ -4240,7 +4263,9 @@ class ServerImpl extends TcpDiscoveryImpl {
                     }
                 }
 
-                joiningNodes.remove(leftNode.id());
+                synchronized (mux) {
+                    joiningNodes.remove(leftNode.id());
+                }
 
                 spi.stats.onNodeLeft();
 
@@ -4418,7 +4443,9 @@ class ServerImpl extends TcpDiscoveryImpl {
                         ", msg=" + msg.warning() + ']');
                 }
 
-                joiningNodes.remove(node.id());
+                synchronized (mux) {
+                    joiningNodes.remove(node.id());
+                }
 
                 notifyDiscovery(EVT_NODE_FAILED, topVer, node);
 
@@ -4619,8 +4646,8 @@ class ServerImpl extends TcpDiscoveryImpl {
             }
 
             if (locNodeId.equals(msg.creatorNodeId()) && !hasMetrics(msg, locNodeId)
&& msg.senderNodeId() != null) {
-                if (log.isDebugEnabled())
-                    log.debug("Discarding heartbeat message that has made two passes: " +
msg);
+                if (log.isTraceEnabled())
+                    log.trace("Discarding heartbeat message that has made two passes: " +
msg);
 
                 return;
             }
@@ -4821,18 +4848,28 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 assert ring.minimumNodeVersion() != null : ring;
 
+                boolean joiningEmpty;
+
+                synchronized (mux) {
+                    joiningEmpty = joiningNodes.isEmpty();
+                }
+
                 if (ring.minimumNodeVersion().compareTo(CUSTOM_MSG_ALLOW_JOINING_FOR_VERIFIED_SINCE)
>= 0)
-                    delayMsg = msg.topologyVersion() == 0L && !joiningNodes.isEmpty();
+                    delayMsg = msg.topologyVersion() == 0L && !joiningEmpty;
                 else
-                    delayMsg = !joiningNodes.isEmpty();
+                    delayMsg = !joiningEmpty;
 
                 if (delayMsg) {
                     if (log.isDebugEnabled()) {
-                        log.debug("Delay custom message processing, there are joining nodes
[msg=" + msg +
-                            ", joiningNodes=" + joiningNodes + ']');
+                        synchronized (mux) {
+                            log.debug("Delay custom message processing, there are joining
nodes [msg=" + msg +
+                                ", joiningNodes=" + joiningNodes + ']');
+                        }
                     }
 
-                    pendingCustomMsgs.add(msg);
+                    synchronized (mux) {
+                        pendingCustomMsgs.add(msg);
+                    }
 
                     return;
                 }
@@ -4973,10 +5010,16 @@ class ServerImpl extends TcpDiscoveryImpl {
          * Checks and flushes custom event messages if no nodes are attempting to join the
grid.
          */
         private void checkPendingCustomMessages() {
-            if (joiningNodes.isEmpty() && isLocalNodeCoordinator()) {
+            boolean joiningEmpty;
+
+            synchronized (mux) {
+                joiningEmpty = joiningNodes.isEmpty();
+            }
+
+            if (joiningEmpty && isLocalNodeCoordinator()) {
                 TcpDiscoveryCustomEventMessage msg;
 
-                while ((msg = pendingCustomMsgs.poll()) != null) {
+                while ((msg = pollPendingCustomeMessage()) != null) {
                     processCustomMessage(msg);
 
                     if (msg.verified())
@@ -4986,6 +5029,15 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
+         * @return Pending custom message.
+         */
+        @Nullable private TcpDiscoveryCustomEventMessage pollPendingCustomeMessage() {
+            synchronized (mux) {
+                return pendingCustomMsgs.poll();
+            }
+        }
+
+        /**
          * @param msg Custom message.
          */
         private void notifyDiscoveryListener(TcpDiscoveryCustomEventMessage msg) {
@@ -5134,8 +5186,12 @@ class ServerImpl extends TcpDiscoveryImpl {
                     else
                         srvrSock = new ServerSocket(port, 0, spi.locHost);
 
-                    if (log.isInfoEnabled())
-                        log.info("Successfully bound to TCP port [port=" + port + ", localHost="
+ spi.locHost + ']');
+                    if (log.isInfoEnabled()) {
+                        log.info("Successfully bound to TCP port [port=" + port +
+                            ", localHost=" + spi.locHost +
+                            ", locNodeId=" + spi.ignite().configuration().getNodeId() +
+                            ']');
+                    }
 
                     return;
                 }
@@ -5450,7 +5506,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     return;
                 }
 
-                long socketTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout()
:
+                long sockTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout()
:
                     spi.getSocketTimeout();
 
                 while (!isInterrupted()) {
@@ -5460,8 +5516,10 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                         msg.senderNodeId(nodeId);
 
-                        if (log.isDebugEnabled())
-                            log.debug("Message has been received: " + msg);
+                        DebugLogger debugLog = messageLogger(msg);
+
+                        if (debugLog.isDebugEnabled())
+                            debugLog.debug("Message has been received: " + msg);
 
                         spi.stats.onMessageReceived(msg);
 
@@ -5469,7 +5527,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                             debugLog(msg, "Message has been received: " + msg);
 
                         if (msg instanceof TcpDiscoveryConnectionCheckMessage) {
-                            spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
+                            spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
 
                             continue;
                         }
@@ -5491,7 +5549,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 TcpDiscoverySpiState state = spiStateCopy();
 
                                 if (state == CONNECTED) {
-                                    spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
+                                    spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
 
                                     if (clientMsgWrk.getState() == State.NEW)
                                         clientMsgWrk.start();
@@ -5501,7 +5559,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     continue;
                                 }
                                 else {
-                                    spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN, socketTimeout);
+                                    spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN, sockTimeout);
 
                                     break;
                                 }
@@ -5509,7 +5567,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                         }
                         else if (msg instanceof TcpDiscoveryDuplicateIdMessage) {
                             // Send receipt back.
-                            spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
+                            spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
 
                             boolean ignored = false;
 
@@ -5538,7 +5596,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                         }
                         else if (msg instanceof TcpDiscoveryAuthFailedMessage) {
                             // Send receipt back.
-                            spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
+                            spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
 
                             boolean ignored = false;
 
@@ -5567,7 +5625,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                         }
                         else if (msg instanceof TcpDiscoveryCheckFailedMessage) {
                             // Send receipt back.
-                            spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
+                            spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
 
                             boolean ignored = false;
 
@@ -5596,7 +5654,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                         }
                         else if (msg instanceof TcpDiscoveryLoopbackProblemMessage) {
                             // Send receipt back.
-                            spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
+                            spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
 
                             boolean ignored = false;
 
@@ -5650,7 +5708,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                             clientMsgWrk.addMessage(ack);
                         }
                         else
-                            spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
+                            spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
 
                         if (heartbeatMsg != null)
                             processClientHeartbeatMessage(heartbeatMsg);
@@ -5914,7 +5972,7 @@ class ServerImpl extends TcpDiscoveryImpl {
         /**
          * @param msg Message.
          */
-        public void addMessage(TcpDiscoveryAbstractMessage msg) {
+        void addMessage(TcpDiscoveryAbstractMessage msg) {
             addMessage(msg, null);
         }
 
@@ -5922,7 +5980,7 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param msg Message.
          * @param msgBytes Optional message bytes.
          */
-        public void addMessage(TcpDiscoveryAbstractMessage msg, @Nullable byte[] msgBytes)
{
+        void addMessage(TcpDiscoveryAbstractMessage msg, @Nullable byte[] msgBytes) {
             T2 t = new T2<>(msg, msgBytes);
 
             if (msg.highPriority())
@@ -5930,6 +5988,8 @@ class ServerImpl extends TcpDiscoveryImpl {
             else
                 queue.add(t);
 
+            DebugLogger log = messageLogger(msg);
+
             if (log.isDebugEnabled())
                 log.debug("Message has been added to client queue: " + msg);
         }
@@ -5948,22 +6008,24 @@ class ServerImpl extends TcpDiscoveryImpl {
                 if (msgBytes == null)
                     msgBytes = U.marshal(spi.marshaller(), msg);
 
+                DebugLogger msgLog = messageLogger(msg);
+
                 if (msg instanceof TcpDiscoveryClientAckResponse) {
                     if (clientVer == null) {
                         ClusterNode node = spi.getNode(clientNodeId);
 
                         if (node != null)
                             clientVer = IgniteUtils.productVersion(node);
-                        else if (log.isDebugEnabled())
-                            log.debug("Skip sending message ack to client, fail to get client
node " +
+                        else if (msgLog.isDebugEnabled())
+                            msgLog.debug("Skip sending message ack to client, fail to get
client node " +
                                 "[sock=" + sock + ", locNodeId=" + getLocalNodeId() +
                                 ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
                     }
 
                     if (clientVer != null &&
                         clientVer.compareTo(TcpDiscoveryClientAckResponse.CLIENT_ACK_SINCE_VERSION)
>= 0) {
-                        if (log.isDebugEnabled())
-                            log.debug("Sending message ack to client [sock=" + sock + ",
locNodeId="
+                        if (msgLog.isDebugEnabled())
+                            msgLog.debug("Sending message ack to client [sock=" + sock +
", locNodeId="
                                 + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg="
+ msg + ']');
 
                         spi.writeToSocket(sock, msg, msgBytes, spi.failureDetectionTimeoutEnabled()
?
@@ -5971,8 +6033,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                     }
                 }
                 else {
-                    if (log.isDebugEnabled())
-                        log.debug("Redirecting message to client [sock=" + sock + ", locNodeId="
+                    if (msgLog.isDebugEnabled())
+                        msgLog.debug("Redirecting message to client [sock=" + sock + ", locNodeId="
                             + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg="
+ msg + ']');
 
                     assert topologyInitialized(msg) : msg;

http://git-wip-us.apache.org/repos/asf/ignite/blob/6b78ad0c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index 30b83e5..0816cbc 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -70,7 +70,33 @@ abstract class TcpDiscoveryImpl {
 
     /** Received messages. */
     @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
-    protected ConcurrentLinkedDeque<String> debugLog;
+    protected ConcurrentLinkedDeque<String> debugLogQ;
+
+    /** */
+    protected final ServerImpl.DebugLogger debugLog = new DebugLogger() {
+        /** {@inheritDoc} */
+        @Override public boolean isDebugEnabled() {
+            return log.isDebugEnabled();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void debug(String msg) {
+            log.debug(msg);
+        }
+    };
+
+    /** */
+    protected final ServerImpl.DebugLogger traceLog = new DebugLogger() {
+        /** {@inheritDoc} */
+        @Override public boolean isDebugEnabled() {
+            return log.isTraceEnabled();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void debug(String msg) {
+            log.trace(msg);
+        }
+    };
 
     /**
      * @param spi Adapter.
@@ -111,12 +137,12 @@ abstract class TcpDiscoveryImpl {
             "-" + locNode.internalOrder() + "] " +
             msg;
 
-        debugLog.add(msg0);
+        debugLogQ.add(msg0);
 
-        int delta = debugLog.size() - debugMsgHist;
+        int delta = debugLogQ.size() - debugMsgHist;
 
-        for (int i = 0; i < delta && debugLog.size() > debugMsgHist; i++)
-            debugLog.poll();
+        for (int i = 0; i < delta && debugLogQ.size() > debugMsgHist; i++)
+            debugLogQ.poll();
     }
 
     /**
@@ -326,4 +352,27 @@ abstract class TcpDiscoveryImpl {
 
         return res;
     }
+
+    /**
+     * @param msg Message.
+     * @return Message logger.
+     */
+    protected final DebugLogger messageLogger(TcpDiscoveryAbstractMessage msg) {
+        return msg.traceLogLevel() ? traceLog : debugLog;
+    }
+
+    /**
+     *
+     */
+    interface DebugLogger {
+        /**
+         * @return {@code True} if debug logging is enabled.
+         */
+        boolean isDebugEnabled();
+
+        /**
+         * @param msg Message to log.
+         */
+        void debug(String msg);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6b78ad0c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
index 24f2a5a..39170ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
@@ -99,6 +99,13 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable
{
     }
 
     /**
+     * @return
+     */
+    public boolean traceLogLevel() {
+        return false;
+    }
+
+    /**
      * Gets creator node.
      *
      * @return Creator node ID.

http://git-wip-us.apache.org/repos/asf/ignite/blob/6b78ad0c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientAckResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientAckResponse.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientAckResponse.java
index 6505765..0a656d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientAckResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientAckResponse.java
@@ -53,6 +53,11 @@ public class TcpDiscoveryClientAckResponse extends TcpDiscoveryAbstractMessage
{
     }
 
     /** {@inheritDoc} */
+    @Override public boolean traceLogLevel() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean highPriority() {
         return true;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6b78ad0c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java
index 3993de0..ade5468 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java
@@ -25,7 +25,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 /**
  * Heartbeat message.
  * <p>
- * Client sends his hearbeats in this message.
+ * Client sends his heartbeats in this message.
  */
 public class TcpDiscoveryClientHeartbeatMessage extends TcpDiscoveryAbstractMessage {
     /** */
@@ -61,6 +61,11 @@ public class TcpDiscoveryClientHeartbeatMessage extends TcpDiscoveryAbstractMess
     }
 
     /** {@inheritDoc} */
+    @Override public boolean traceLogLevel() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(TcpDiscoveryClientHeartbeatMessage.class, this, "super", super.toString());
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6b78ad0c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java
index a152936..7793a3a 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java
@@ -50,6 +50,11 @@ public class TcpDiscoveryConnectionCheckMessage extends TcpDiscoveryAbstractMess
     }
 
     /** {@inheritDoc} */
+    @Override public boolean traceLogLevel() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         // This method has been left empty intentionally to keep message size at min.
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6b78ad0c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java
index 338e3f5..0ae253a 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java
@@ -214,6 +214,11 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage
{
     }
 
     /** {@inheritDoc} */
+    @Override public boolean traceLogLevel() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean highPriority() {
         return true;
     }


Mime
View raw message