ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-4296-1
Date Mon, 28 Nov 2016 10:53:51 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-4296-1 [created] 39532a3ee


ignite-4296-1


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/39532a3e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/39532a3e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/39532a3e

Branch: refs/heads/ignite-4296-1
Commit: 39532a3eed0c4f37052f8e134ca9f71c9c82d348
Parents: 460a262
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Nov 28 13:01:49 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Nov 28 13:01:49 2016 +0300

----------------------------------------------------------------------
 .../discovery/GridDiscoveryManager.java         |  20 +++
 .../cache/CacheAffinitySharedManager.java       |   8 ++
 .../GridCachePartitionExchangeManager.java      |  18 +++
 .../GridDhtPartitionsExchangeFuture.java        | 104 +++++++++++++-
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 142 ++++++++++++++++---
 .../tcp/internal/TcpDiscoveryStatistics.java    |  12 +-
 .../TcpDiscoveryJoinRequestMessage.java         |  13 ++
 7 files changed, 292 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/39532a3e/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 488dabe..662fa68 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -1621,6 +1621,15 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi>
{
     }
 
     /**
+     * @param topVer Topology version.
+     * @param node Node.
+     * @return DHT caches started on given node.
+     */
+    public Collection<String> nodeCaches(AffinityTopologyVersion topVer, ClusterNode
node) {
+        return resolveDiscoCache(null, topVer).nodeCaches(node);
+    }
+
+    /**
      * Gets cache remote nodes for cache with given name.
      *
      * @param topVer Topology version.
@@ -2692,6 +2701,17 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi>
{
             this.nodeMap = nodeMap;
         }
 
+        Collection<String> nodeCaches(ClusterNode node) {
+            List<String> res = new ArrayList<>();
+
+            for (Map.Entry<String, Collection<ClusterNode>> e : affCacheNodes.entrySet())
{
+                if (F.contains(e.getValue(), node))
+                    res.add(e.getKey());
+            }
+
+            return res;
+        }
+
         /**
          * Adds node to map.
          *

http://git-wip-us.apache.org/repos/asf/ignite/blob/39532a3e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 2890887..77563ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -1517,6 +1517,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         return assignment;
     }
 
+    public GridAffinityAssignmentCache cacheAssignment(Integer cacheId) {
+        CacheHolder holder = caches.get(cacheId);
+
+        assert holder != null;
+
+        return holder.affinity();
+    }
+
     /**
      *
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/39532a3e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 7a24aa1..2d2f738 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -102,6 +102,7 @@ import org.jsr166.ConcurrentLinkedDeque8;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_PRELOAD_RESEND_TIMEOUT;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT;
+import static org.apache.ignite.IgniteSystemProperties.getBoolean;
 import static org.apache.ignite.IgniteSystemProperties.getLong;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
@@ -119,6 +120,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     /** Exchange history size. */
     private static final int EXCHANGE_HISTORY_SIZE = 1000;
 
+    /** */
+    private boolean skipFirstExchangeMsg;
+
     /** Atomic reference for pending timeout object. */
     private AtomicReference<ResendTimeoutObject> pendingResend = new AtomicReference<>();
 
@@ -300,10 +304,24 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         }
     };
 
+    public boolean skipFirstExchangeMessage() {
+        return skipFirstExchangeMsg;
+    }
+
     /** {@inheritDoc} */
     @Override protected void start0() throws IgniteCheckedException {
         super.start0();
 
+        if (getBoolean("SKIP_FIRST_EXCHANGE_MSG", false)) {
+            if (cctx.kernalContext().config().isLateAffinityAssignment()) {
+                skipFirstExchangeMsg = true;
+
+                cctx.kernalContext().addNodeAttribute("SKIP_FIRST_EXCHANGE_MSG", true);
+            }
+            else
+                U.warn(log, "Can not use SKIP_FIRST_EXCHANGE_MSG optimization when late affinity
assignment disabled.");
+        }
+
         exchWorker = new ExchangeWorker();
 
         cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT,
EVT_NODE_FAILED,

http://git-wip-us.apache.org/repos/asf/ignite/blob/39532a3e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index e945de9..4e269cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -81,6 +81,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
 
 /**
  * Future for exchanging partition maps.
@@ -115,6 +116,15 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     private int pendingSingleUpdates;
 
     /** */
+    public int singleMsgUpdateCnt;
+
+    /** */
+    private long singleMsgUpdateMaxTime;
+
+    /** */
+    private long singleMsgUpdateMinTime = Long.MAX_VALUE;
+
+    /** */
     @GridToStringExclude
     private List<ClusterNode> srvNodes;
 
@@ -435,6 +445,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         assert !dummy && !forcePreload : this;
 
         try {
+            long initStart = System.currentTimeMillis();
+
+            log.info("Start exchange init [topVer=" + topologyVersion() + ']');
+
             srvNodes = new ArrayList<>(cctx.discovery().serverNodes(topologyVersion()));
 
             remaining.addAll(F.nodeIds(F.view(srvNodes, F.remoteNodes(cctx.localNodeId()))));
@@ -466,10 +480,14 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                         cctx.affinity().initStartedCaches(crdNode, this, receivedCaches);
                 }
 
+                long affStart = System.currentTimeMillis();
+
                 if (CU.clientNode(discoEvt.eventNode()))
                     exchange = onClientNodeEvent(crdNode);
                 else
                     exchange = onServerNodeEvent(crdNode);
+
+                log.info("Affinity call time [topVer=" + topologyVersion() + ", time=" +
(System.currentTimeMillis() - affStart) + ']');
             }
 
             updateTopologies(crdNode);
@@ -500,6 +518,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 default:
                     assert false;
             }
+
+            log.info("Finish exchange init [topVer=" + topologyVersion() + ", time=" + (System.currentTimeMillis()
- initStart) + ']');
         }
         catch (IgniteInterruptedCheckedException e) {
             onDone(e);
@@ -738,11 +758,65 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         }
 
         if (crd.isLocal()) {
+            ClusterNode node = discoEvt.eventNode();
+
+            Object attr = node.attribute("SKIP_FIRST_EXCHANGE_MSG");
+
+            boolean skipFirstExchange = Boolean.TRUE.equals(attr) || ((attr instanceof String)
&& "true".equalsIgnoreCase((String)attr));
+
+            if (discoEvt.type() == EVT_NODE_JOINED && !node.isLocal() &&
skipFirstExchange) {
+                assert !CU.clientNode(node) : discoEvt;
+                assert srvNodes.contains(node);
+
+                boolean rmv = remaining.remove(node.id());
+                assert rmv;
+
+                Collection<String> caches = cctx.discovery().nodeCaches(topologyVersion(),
node);
+
+                if (!caches.isEmpty()) {
+                    GridDhtPartitionsSingleMessage msg = new GridDhtPartitionsSingleMessage(exchangeId(),
+                        false,
+                        null,
+                        false);
+
+                    for (String cache : caches) {
+                        Map<Integer, GridDhtPartitionState> m = new HashMap<>();
+
+                        GridDhtPartitionMap2 partMap = new GridDhtPartitionMap2(node.id(),
1, topologyVersion(), m, true);
+
+                        GridAffinityAssignmentCache assign = cctx.affinity().cacheAssignment(CU.cacheId(cache));
+
+                        for (Integer part : assign.primaryPartitions(node.id(), topologyVersion()))
+                            partMap.put(part, MOVING);
+                        for (Integer part : assign.backupPartitions(node.id(), topologyVersion()))
+                            partMap.put(part, MOVING);
+
+                        msg.addLocalPartitionMap(CU.cacheId(cache), partMap, null);
+                    }
+
+                    updatePartitionSingleMap(msg);
+                }
+            }
+
             if (remaining.isEmpty())
                 onAllReceived(false);
         }
-        else
-            sendPartitions(crd);
+        else {
+            boolean skipSnd = false;
+
+            if (cctx.exchange().skipFirstExchangeMessage() && discoEvt.type() ==
EVT_NODE_JOINED && discoEvt.eventNode().isLocal())
+                skipSnd = true;
+
+            if (!skipSnd) {
+                long sndStart = System.currentTimeMillis();
+
+                sendPartitions(crd);
+
+                log.info("Send parts time [topVer=" + topologyVersion() + ", time=" + (System.currentTimeMillis()
- sndStart) + ']');
+            }
+            else
+                log.info("Skip first exchange message [topVer=" + topologyVersion() + ']');
+        }
 
         initDone();
     }
@@ -1051,9 +1125,20 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         cctx.cache().onExchangeDone(exchId.topologyVersion(), reqs, err);
 
         if (super.onDone(res, err) && realExchange) {
+            if (singleMsgUpdateCnt > 0) {
+                log.info("Completed partition exchange [maxSingleUpdate=" + singleMsgUpdateMaxTime
+
+                    ", minSingleUpdate=" + singleMsgUpdateMinTime +
+                    ", duration=" + duration() +
+                    ", durationFromInit=" + (U.currentTimeMillis() - initTs) + ']');
+            }
+            else {
+                log.info("Completed partition exchange [duration=" + duration() +
+                    ", durationFromInit=" + (U.currentTimeMillis() - initTs) + ']');
+            }
+
             if (log.isDebugEnabled())
                 log.debug("Completed partition exchange [localNode=" + cctx.localNodeId()
+ ", exchange= " + this +
-                    "duration=" + duration() + ", durationFromInit=" + (U.currentTimeMillis()
- initTs) + ']');
+                    ", duration=" + duration() + ", durationFromInit=" + (U.currentTimeMillis()
- initTs) + ']');
 
             initFut.onDone(err == null);
 
@@ -1168,6 +1253,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         boolean allReceived = false;
         boolean updateSingleMap = false;
 
+        long start = U.currentTimeMillis();
+
         synchronized (mux) {
             assert crd != null;
 
@@ -1190,6 +1277,15 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             }
             finally {
                 synchronized (mux) {
+                    long time = U.currentTimeMillis() - start;
+
+                    singleMsgUpdateCnt++;
+
+                    if (time > singleMsgUpdateMaxTime)
+                        singleMsgUpdateMaxTime = time;
+                    if (time < singleMsgUpdateMinTime)
+                        singleMsgUpdateMinTime = time;
+
                     assert pendingSingleUpdates > 0;
 
                     pendingSingleUpdates--;
@@ -1252,6 +1348,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         try {
             assert crd.isLocal();
 
+            log.info("Coordinator exchange all received [topVer=" + topologyVersion() + ']');
+
             if (!crd.equals(cctx.discovery().serverNodes(topologyVersion()).get(0))) {
                 for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
                     if (!cacheCtx.isLocal())

http://git-wip-us.apache.org/repos/asf/ignite/blob/39532a3e/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 0e90418..8927a32 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -176,6 +176,9 @@ class ServerImpl extends TcpDiscoveryImpl {
         IgniteProductVersion.fromString("1.5.0");
 
     /** */
+    private static final boolean SEND_JOIN_REQ_DIRECTLY = getBoolean("SEND_JOIN_REQ_DIRECTLY",
false);
+
+    /** */
     private IgniteThreadPoolExecutor utilityPool;
 
     /** Nodes ring. */
@@ -926,6 +929,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
         if (log.isDebugEnabled())
             log.debug("Discovery SPI has been connected to topology with order: " + locNode.internalOrder());
+
+        log.info("Node joined topology: " + locNode);
     }
 
     /**
@@ -982,7 +987,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             for (InetSocketAddress addr : addrs) {
                 try {
-                    Integer res = sendMessageDirectly(joinReq, addr);
+                    Integer res = sendMessageDirectly(joinReq, addr, true);
 
                     assert res != null;
 
@@ -1095,13 +1100,15 @@ class ServerImpl extends TcpDiscoveryImpl {
      *
      * @param msg Message to send.
      * @param addr Address to send message to.
+     * @param join {@code True} if sends initial node join request.
      * @return Response read from the recipient or {@code null} if no response is supposed.
      * @throws IgniteSpiException If an error occurs.
      */
-    @Nullable private Integer sendMessageDirectly(TcpDiscoveryAbstractMessage msg, InetSocketAddress
addr)
+    @Nullable private Integer sendMessageDirectly(TcpDiscoveryAbstractMessage msg, InetSocketAddress
addr, boolean join)
         throws IgniteSpiException {
         assert msg != null;
         assert addr != null;
+        assert !join || msg instanceof TcpDiscoveryJoinRequestMessage;
 
         Collection<Throwable> errs = null;
 
@@ -1188,7 +1195,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 // Connection has been established, but
                 // join request may not be unmarshalled on remote host.
                 // E.g. due to class not found issue.
-                joinReqSent = msg instanceof TcpDiscoveryJoinRequestMessage;
+                joinReqSent = join;
 
                 int receipt = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0));
 
@@ -3762,8 +3769,79 @@ class ServerImpl extends TcpDiscoveryImpl {
                 if (nodeAddedMsg.verified())
                     msgHist.add(nodeAddedMsg);
             }
-            else if (sendMessageToRemotes(msg))
-                sendMessageAcrossRing(msg);
+            else {
+                if (sendMessageToRemotes(msg)) {
+                    if (SEND_JOIN_REQ_DIRECTLY && !msg.directSendFailed()) {
+                        final TcpDiscoveryNode crd = resolveCoordinator();
+
+                        Collection<TcpDiscoveryNode> failedNodes;
+
+                        synchronized (mux) {
+                            failedNodes = U.arrayList(ServerImpl.this.failedNodes.keySet());
+                        }
+
+                        TcpDiscoveryNode next = ring.nextNode(failedNodes);
+
+                        if (crd != null && !crd.equals(next)) {
+                            if (log.isDebugEnabled()) {
+                                log.debug("Will send join request directly to coordinator
" +
+                                    "[msg=" + msg + ", crd=" + crd + ", next=" + next + ']');
+                            }
+
+                            log.info("Will send join request directly to coordinator " +
+                                "[cnt=" + joiningNodes.size() + ", msg=" + msg + ", crd="
+ crd + ", next=" + next + ']');
+
+                            utilityPool.submit(new Runnable() {
+                                @Override public void run() {
+                                    IgniteSpiException sndErr = null;
+                                    Integer res = null;
+
+                                    TcpDiscoveryJoinRequestMessage msg0 =
+                                        new TcpDiscoveryJoinRequestMessage(msg.node(), msg.discoveryData());
+
+                                    try {
+                                        res = trySendMessageDirectly(crd, msg0);
+
+                                        if (F.eq(RES_OK, res)) {
+                                            if (log.isDebugEnabled()) {
+                                                log.debug("Sent join request directly to
coordinator " +
+                                                    "[msg=" + msg0 + ", crd=" + crd + ']');
+                                            }
+
+                                            log.info("Sent join request directly to coordinator
" +
+                                                "[msg=" + msg0 + ", crd=" + crd + ']');
+
+                                            return;
+                                        }
+                                    }
+                                    catch (IgniteSpiException e) {
+                                        sndErr = e;
+                                    }
+
+                                    if (log.isDebugEnabled()) {
+                                        log.debug("Failed to send join request to coordinator,
will process from " +
+                                            "message worker [msg=" + msg0 + ", crd=" + crd
+ ", err=" + sndErr +
+                                            ", res=" + res + ']');
+                                    }
+
+                                    log.info("Failed to send join request to coordinator,
will process from " +
+                                        "message worker [msg=" + msg0 + ", crd=" + crd +
", err=" + sndErr +
+                                        ", res=" + res + ']');
+
+                                    msg.directSendFailed(true);
+
+                                    msgWorker.addMessage(msg);
+                                }
+                            });
+
+                            return;
+                        }
+                    }
+
+                    sendMessageAcrossRing(msg);
+                }
+
+            }
         }
 
         /**
@@ -3798,7 +3876,7 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param msg Message.
          * @throws IgniteSpiException Last failure if all attempts failed.
          */
-        private void trySendMessageDirectly(TcpDiscoveryNode node, TcpDiscoveryAbstractMessage
msg)
+        private Integer trySendMessageDirectly(TcpDiscoveryNode node, TcpDiscoveryAbstractMessage
msg)
             throws IgniteSpiException {
             if (node.isClient()) {
                 TcpDiscoveryNode routerNode = ring.node(node.clientRouterNodeId());
@@ -3819,25 +3897,21 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     worker.addMessage(msg);
 
-                    return;
+                    return null;
                 }
 
-                trySendMessageDirectly(routerNode, msg);
-
-                return;
+                return trySendMessageDirectly(routerNode, msg);
             }
 
             IgniteSpiException ex = null;
 
             for (InetSocketAddress addr : spi.getNodeAddresses(node, U.sameMacs(locNode,
node))) {
                 try {
-                    sendMessageDirectly(msg, addr);
+                    Integer res = sendMessageDirectly(msg, addr, false);
 
                     node.lastSuccessfulAddress(addr);
 
-                    ex = null;
-
-                    break;
+                    return res;
                 }
                 catch (IgniteSpiException e) {
                     ex = e;
@@ -3846,6 +3920,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             if (ex != null)
                 throw ex;
+
+            return null;
         }
 
         /**
@@ -3968,7 +4044,10 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             if (isLocalNodeCoordinator()) {
                 if (msg.verified()) {
-                    spi.stats.onRingMessageReceived(msg);
+                    Long time = spi.stats.onRingMessageReceived(msg);
+
+                    if (time != null)
+                        log.info("TcpDiscoveryNodeAddedMessage ring time: " + time);
 
                     TcpDiscoveryNodeAddFinishedMessage addFinishMsg = new TcpDiscoveryNodeAddFinishedMessage(locNodeId,
                         node.id());
@@ -4277,7 +4356,10 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             if (locNodeCoord) {
                 if (msg.verified()) {
-                    spi.stats.onRingMessageReceived(msg);
+                    Long time = spi.stats.onRingMessageReceived(msg);
+
+                    if (time != null)
+                        log.info("TcpDiscoveryNodeAddFinishedMessage ring time: " + time);
 
                     addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
 
@@ -5164,7 +5246,10 @@ class ServerImpl extends TcpDiscoveryImpl {
                 else {
                     addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id(),
true));
 
-                    spi.stats.onRingMessageReceived(msg);
+                    Long time = spi.stats.onRingMessageReceived(msg);
+
+                    if (time != null)
+                        log.info("TcpDiscoveryCustomEventMessage ring time: " + time);
 
                     DiscoverySpiCustomMessage msgObj = null;
 
@@ -5561,6 +5646,9 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             ClientMessageWorker clientMsgWrk = null;
 
+            TcpDiscoveryAbstractMessage msg = null;
+            Exception sockE = null;
+
             try {
                 InputStream in;
 
@@ -5621,7 +5709,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     // Restore timeout.
                     sock.setSoTimeout(timeout);
 
-                    TcpDiscoveryAbstractMessage msg = spi.readMessage(sock, in, spi.netTimeout);
+                    msg = spi.readMessage(sock, in, spi.netTimeout);
 
                     // Ping.
                     if (msg instanceof TcpDiscoveryPingRequest) {
@@ -5726,6 +5814,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                     }
                 }
                 catch (IOException e) {
+                    sockE = e;
+
                     if (log.isDebugEnabled())
                         U.error(log, "Caught exception on handshake [err=" + e +", sock="
+ sock + ']', e);
 
@@ -5753,6 +5843,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                     return;
                 }
                 catch (IgniteCheckedException e) {
+                    sockE = e;
+
                     if (log.isDebugEnabled())
                         U.error(log, "Caught exception on handshake [err=" + e +", sock="
+ sock + ']', e);
 
@@ -5782,8 +5874,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 while (!isInterrupted()) {
                     try {
-                        TcpDiscoveryAbstractMessage msg = U.unmarshal(spi.marshaller(), in,
-                            U.resolveClassLoader(spi.ignite().configuration()));
+                        msg = U.unmarshal(spi.marshaller(), in, U.resolveClassLoader(spi.ignite().configuration()));
 
                         msg.senderNodeId(nodeId);
 
@@ -5810,9 +5901,12 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                                 if (clientMsgWrk != null && ok)
                                     continue;
-                                else
+                                else {
+                                    log.info("Processed join request, close connection [msg="
+ msg + ']');
+
                                     // Direct join request - no need to handle this socket
anymore.
                                     break;
+                                }
                             }
                         }
                         else if (msg instanceof TcpDiscoveryClientReconnectMessage) {
@@ -5985,6 +6079,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                             processClientHeartbeatMessage(heartbeatMsg);
                     }
                     catch (IgniteCheckedException e) {
+                        sockE = e;
+
                         if (log.isDebugEnabled())
                             U.error(log, "Caught exception on message read [sock=" + sock
+
                                 ", locNodeId=" + locNodeId + ", rmtNodeId=" + nodeId + ']',
e);
@@ -6012,6 +6108,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                         return;
                     }
                     catch (IOException e) {
+                        sockE = e;
+
                         if (log.isDebugEnabled())
                             U.error(log, "Caught exception on message read [sock=" + sock
+ ", locNodeId=" + locNodeId +
                                 ", rmtNodeId=" + nodeId + ']', e);
@@ -6035,6 +6133,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
             }
             finally {
+                log.info("Close sock [readers=" + spi.stats.socketReaders() + ", msg=" +
msg + ", err=" + sockE + ']');
+
                 if (clientMsgWrk != null) {
                     if (log.isDebugEnabled())
                         log.debug("Client connection failed [sock=" + sock + ", locNodeId="
+ locNodeId +

http://git-wip-us.apache.org/repos/asf/ignite/blob/39532a3e/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java
index bb5ab66..e725d00 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage;
@@ -320,6 +321,7 @@ public class TcpDiscoveryStatistics {
         if (crdSinceTs.get() > 0 &&
             (msg instanceof TcpDiscoveryCustomEventMessage) ||
             (msg instanceof TcpDiscoveryNodeAddedMessage) ||
+            (msg instanceof TcpDiscoveryNodeAddFinishedMessage) ||
             (msg instanceof TcpDiscoveryNodeLeftMessage) ||
             (msg instanceof TcpDiscoveryNodeFailedMessage)) {
             ringMsgsSndTs.put(msg.id(), U.currentTimeMillis());
@@ -383,7 +385,7 @@ public class TcpDiscoveryStatistics {
      *
      * @param msg Message.
      */
-    public synchronized void onRingMessageReceived(TcpDiscoveryAbstractMessage msg) {
+    public synchronized Long onRingMessageReceived(TcpDiscoveryAbstractMessage msg) {
         assert msg != null;
 
         Long sentTs = ringMsgsSndTs.get(msg.id());
@@ -399,7 +401,11 @@ public class TcpDiscoveryStatistics {
 
             if (ringMsgsSent != 0)
                 avgRingMsgTime = (avgRingMsgTime * (ringMsgsSent - 1) + duration) / ringMsgsSent;
+
+            return duration;
         }
+
+        return null;
     }
 
     /**
@@ -630,6 +636,10 @@ public class TcpDiscoveryStatistics {
         return sockReadersCreated;
     }
 
+    public synchronized int socketReaders() {
+        return sockReadersCreated - sockReadersRmv;
+    }
+
     /**
      * Gets socket readers removed count.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/39532a3e/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
index 22ffae8..4422919 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.spi.discovery.tcp.messages;
 
 import java.util.Map;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
@@ -36,6 +37,10 @@ public class TcpDiscoveryJoinRequestMessage extends TcpDiscoveryAbstractMessage
     /** Discovery data. */
     private final Map<Integer, byte[]> discoData;
 
+    /** */
+    @GridToStringExclude
+    private transient boolean directSndFailed;
+
     /**
      * Constructor.
      *
@@ -79,6 +84,14 @@ public class TcpDiscoveryJoinRequestMessage extends TcpDiscoveryAbstractMessage
         setFlag(RESPONDED_FLAG_POS, responded);
     }
 
+    public boolean directSendFailed() {
+        return directSndFailed;
+    }
+
+    public void directSendFailed(boolean directSndFailed) {
+        this.directSndFailed = directSndFailed;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean equals(Object obj) {
         // NOTE!


Mime
View raw message