ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vkuliche...@apache.org
Subject [11/50] incubator-ignite git commit: # ignite-883
Date Tue, 16 Jun 2015 23:41:20 GMT
# ignite-883


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/de0d61f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/de0d61f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/de0d61f6

Branch: refs/heads/ignite-884
Commit: de0d61f6a3467379c168a15bbd9c4580aeebb092
Parents: 89a4f7c
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Jun 11 10:40:27 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Jun 11 17:40:01 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/IgniteKernal.java    |   1 -
 .../discovery/GridDiscoveryManager.java         |   9 +-
 .../dht/preloader/GridDhtPreloader.java         |   2 +-
 .../org/apache/ignite/spi/IgniteSpiAdapter.java |  28 ++-
 .../communication/tcp/TcpCommunicationSpi.java  |   2 +-
 .../ignite/spi/discovery/tcp/ClientImpl.java    | 214 ++++++++++++-------
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  64 ------
 .../spi/discovery/tcp/TcpDiscoveryImpl.java     |  66 ++++++
 .../distributed/IgniteCacheManyClientsTest.java |  87 +++++++-
 9 files changed, 323 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/de0d61f6/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 4f5e365..f38fee1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -1001,7 +1001,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable
{
         A.notNull(cfg.getMBeanServer(), "cfg.getMBeanServer()");
         A.notNull(cfg.getGridLogger(), "cfg.getGridLogger()");
         A.notNull(cfg.getMarshaller(), "cfg.getMarshaller()");
-        A.notNull(cfg.getPublicThreadPoolSize(), "cfg.getPublicThreadPoolSize()");
         A.notNull(cfg.getUserAttributes(), "cfg.getUserAttributes()");
 
         // All SPIs should be non-null.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/de0d61f6/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 71fbc61..464110c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -2057,17 +2057,22 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi>
{
         private final AffinityTopologyVersion topVer;
 
         /** */
+        @GridToStringExclude
         private final DiscoCache discoCache;
 
         /**
          * @param topVer Topology version.
          * @param discoCache Disco cache.
          */
-        private Snapshot(AffinityTopologyVersion topVer,
-            DiscoCache discoCache) {
+        private Snapshot(AffinityTopologyVersion topVer, DiscoCache discoCache) {
             this.topVer = topVer;
             this.discoCache = discoCache;
         }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Snapshot.class, this);
+        }
     }
 
     /** Cache for discovery collections. */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/de0d61f6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 51010ce..0355bb3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -102,7 +102,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
                 boolean set = topVer.setIfGreater(e.topologyVersion());
 
                 assert set : "Have you configured TcpDiscoverySpi for your in-memory data
grid? [newVer=" +
-                    e.topologyVersion() + ", curVer=" + topVer.get() + ']';
+                    e.topologyVersion() + ", curVer=" + topVer.get() + ", evt=" + e + ']';
 
                 if (e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED) {
                     for (GridDhtAssignmentFetchFuture fut : pendingAssignmentFetchFuts.values())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/de0d61f6/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 6e7a706..476f8a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -585,8 +585,32 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
         GridDummySpiContext(ClusterNode locNode, boolean stopping, @Nullable IgniteSpiContext
spiCtx) {
             this.locNode = locNode;
             this.stopping = stopping;
-            this.msgFactory = spiCtx != null ? spiCtx.messageFactory() : null;
-            this.msgFormatter = spiCtx != null ? spiCtx.messageFormatter() : null;
+
+            MessageFactory msgFactory0 = spiCtx != null ? spiCtx.messageFactory() : null;
+            MessageFormatter msgFormatter0 = spiCtx != null ? spiCtx.messageFormatter() :
null;
+
+            if (msgFactory0 == null) {
+                msgFactory0 = new MessageFactory() {
+                    @Nullable @Override public Message create(byte type) {
+                        throw new IgniteException("Failed to read message, node is not started.");
+                    }
+                };
+            }
+
+            if (msgFormatter0 == null) {
+                msgFormatter0 = new MessageFormatter() {
+                    @Override public MessageWriter writer() {
+                        throw new IgniteException("Failed to write message, node is not started.");
+                    }
+
+                    @Override public MessageReader reader(MessageFactory factory) {
+                        throw new IgniteException("Failed to read message, node is not started.");
+                    }
+                };
+            }
+
+            this.msgFactory = msgFactory0;
+            this.msgFormatter = msgFormatter0;
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/de0d61f6/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index a661965..f19e25b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -2028,7 +2028,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                     if (X.hasCause(e, SocketTimeoutException.class))
                         LT.warn(log, null, "Connect timed out (consider increasing 'connTimeout'
" +
-                            "configuration property) [addr=" + addr + ']');
+                            "configuration property) [addr=" + addr + ", connTimeout" + connTimeout
+ ']');
 
                     if (errs == null)
                         errs = new IgniteCheckedException("Failed to connect to node (is
node still alive?). " +

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/de0d61f6/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index d064c8d..23297ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -195,7 +195,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                     U.warn(log, "Failed to left node: timeout [nodeId=" + locNode + ']');
             }
             catch (InterruptedException ignored) {
-
+                // No-op.
             }
         }
 
@@ -282,7 +282,7 @@ class ClientImpl extends TcpDiscoveryImpl {
             return false;
         }
         catch (IgniteCheckedException e) {
-            throw new IgniteSpiException(e); // Should newer occur
+            throw new IgniteSpiException(e); // Should newer occur.
         }
     }
 
@@ -347,7 +347,10 @@ class ClientImpl extends TcpDiscoveryImpl {
     }
 
     /**
+     * @param recon {@code True} if reconnects.
      * @return Opened socket or {@code null} if timeout.
+     * @throws InterruptedException If interrupted.
+     * @throws IgniteSpiException If failed.
      * @see TcpDiscoverySpi#joinTimeout
      */
     @SuppressWarnings("BusyWait")
@@ -387,71 +390,152 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 InetSocketAddress addr = it.next();
 
-                Socket sock = null;
+                T2<Socket, Integer> sockAndRes = sendJoinRequest(recon, addr);
 
-                try {
-                    long ts = U.currentTimeMillis();
+                if (sockAndRes == null) {
+                    it.remove();
 
-                    IgniteBiTuple<Socket, UUID> t = initConnection(addr);
+                    continue;
+                }
 
-                    sock = t.get1();
+                assert sockAndRes.get1() != null;
+                assert sockAndRes.get2() != null;
 
-                    UUID rmtNodeId = t.get2();
+                Socket sock = sockAndRes.get1();
 
-                    spi.stats.onClientSocketInitialized(U.currentTimeMillis() - ts);
+                switch (sockAndRes.get2()) {
+                    case RES_OK:
+                        return sock;
 
-                    locNode.clientRouterNodeId(rmtNodeId);
+                    case RES_CONTINUE_JOIN:
+                    case RES_WAIT:
+                        U.closeQuiet(sock);
 
-                    TcpDiscoveryAbstractMessage msg = recon ?
-                        new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId,
-                            lastMsgId) :
-                        new TcpDiscoveryJoinRequestMessage(locNode, spi.collectExchangeData(getLocalNodeId()));
+                        break;
 
-                    msg.client(true);
+                    default:
+                        if (log.isDebugEnabled())
+                            log.debug("Received unexpected response to join request: " +
sockAndRes.get2());
 
-                    spi.writeToSocket(sock, msg);
+                        U.closeQuiet(sock);
+                }
+            }
 
-                    int res = spi.readReceipt(sock, spi.ackTimeout);
+            if (addrs.isEmpty()) {
+                if (spi.joinTimeout > 0 && (U.currentTimeMillis() - startTime)
> spi.joinTimeout)
+                    return null;
 
-                    switch (res) {
-                        case RES_OK:
-                            return sock;
+                Thread.sleep(2000);
 
-                        case RES_CONTINUE_JOIN:
-                        case RES_WAIT:
-                            U.closeQuiet(sock);
+                U.warn(log, "Failed to connect to any address from IP finder (will retry
to join topology " +
+                    "in 2000ms): " + addrs0);
+            }
+        }
+    }
 
-                            break;
+    /**
+     * @param recon {@code True} if reconnects.
+     * @param addr Address.
+     * @return Socket and connect response.
+     */
+    @Nullable private T2<Socket, Integer> sendJoinRequest(boolean recon, InetSocketAddress
addr) {
+        assert addr != null;
 
-                        default:
-                            if (log.isDebugEnabled())
-                                log.debug("Received unexpected response to join request:
" + res);
+        Collection<Throwable> errs = null;
 
-                            U.closeQuiet(sock);
-                    }
-                }
-                catch (IOException | IgniteCheckedException e) {
-                    if (log.isDebugEnabled())
-                        U.error(log, "Failed to establish connection with address: " + addr,
e);
+        long ackTimeout0 = spi.ackTimeout;
 
-                    U.closeQuiet(sock);
+        int connectAttempts = 1;
 
-                    it.remove();
-                }
+        UUID locNodeId = getLocalNodeId();
+
+        for (int i = 0; i < spi.reconCnt; i++) {
+            boolean openSock = false;
+
+            Socket sock = null;
+
+            try {
+                long tstamp = U.currentTimeMillis();
+
+                sock = spi.openSocket(addr);
+
+                openSock = true;
+
+                TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(locNodeId);
+
+                req.client(true);
+
+                spi.writeToSocket(sock, req);
+
+                TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0);
+
+                UUID rmtNodeId = res.creatorNodeId();
+
+                assert rmtNodeId != null;
+                assert !getLocalNodeId().equals(rmtNodeId);
+
+                spi.stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp);
+
+                locNode.clientRouterNodeId(rmtNodeId);
+
+                tstamp = U.currentTimeMillis();
+
+                TcpDiscoveryAbstractMessage msg = recon ?
+                    new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId, lastMsgId)
:
+                    new TcpDiscoveryJoinRequestMessage(locNode, spi.collectExchangeData(getLocalNodeId()));
+
+                msg.client(true);
+
+                spi.writeToSocket(sock, msg);
+
+                spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
+
+                if (log.isDebugEnabled())
+                    log.debug("Message has been sent to address [msg=" + msg + ", addr="
+ addr +
+                        ", rmtNodeId=" + rmtNodeId + ']');
+
+                return new T2<>(sock, spi.readReceipt(sock, ackTimeout0));
             }
+            catch (IOException | IgniteCheckedException e) {
+                U.closeQuiet(sock);
 
-            if (addrs.isEmpty()) {
-                U.warn(log, "Failed to connect to any address from IP finder (will retry
to join topology " +
-                    "in 2000ms): " + addrs0);
+                if (log.isDebugEnabled())
+                    log.error("Exception on joining: " + e.getMessage(), e);
 
-                if (spi.joinTimeout > 0 && (U.currentTimeMillis() - startTime)
> spi.joinTimeout)
-                    return null;
+                onException("Exception on joining: " + e.getMessage(), e);
 
-                Thread.sleep(2000);
+                if (errs == null)
+                    errs = new ArrayList<>();
+
+                errs.add(e);
+
+                if (!openSock) {
+                    // Reconnect for the second time, if connection is not established.
+                    if (connectAttempts < 2) {
+                        connectAttempts++;
+
+                        continue;
+                    }
+
+                    break; // Don't retry if we can not establish connection.
+                }
+
+                if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class))
{
+                    ackTimeout0 *= 2;
+
+                    if (!checkAckTimeout(ackTimeout0))
+                        break;
+                }
             }
         }
+
+        if (log.isDebugEnabled())
+            log.debug("Failed to join to address [addr=" + addr + ", recon=" + recon + ",
errs=" + errs + ']');
+
+        return null;
     }
 
+
     /**
      * @param topVer New topology version.
      * @return Latest topology snapshot.
@@ -493,33 +577,6 @@ class ClientImpl extends TcpDiscoveryImpl {
         return allNodes;
     }
 
-    /**
-     * @param addr Address.
-     * @return Remote node ID.
-     * @throws IOException In case of I/O error.
-     * @throws IgniteCheckedException In case of other error.
-     */
-    private IgniteBiTuple<Socket, UUID> initConnection(InetSocketAddress addr) throws
IOException, IgniteCheckedException {
-        assert addr != null;
-
-        Socket sock = spi.openSocket(addr);
-
-        TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(getLocalNodeId());
-
-        req.client(true);
-
-        spi.writeToSocket(sock, req);
-
-        TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, spi.ackTimeout);
-
-        UUID nodeId = res.creatorNodeId();
-
-        assert nodeId != null;
-        assert !getLocalNodeId().equals(nodeId);
-
-        return F.t(sock, nodeId);
-    }
-
     /** {@inheritDoc} */
     @Override void simulateNodeFailure() {
         U.warn(log, "Simulating client node failure: " + getLocalNodeId());
@@ -736,7 +793,7 @@ class ClientImpl extends TcpDiscoveryImpl {
         }
 
         /**
-         *
+         * @return {@code True} if connection is alive.
          */
         public boolean isOnline() {
             synchronized (mux) {
@@ -780,7 +837,8 @@ class ClientImpl extends TcpDiscoveryImpl {
                 }
                 catch (IOException e) {
                     if (log.isDebugEnabled())
-                        U.error(log, "Failed to send node left message (will stop anyway)
[sock=" + sock + ']', e);
+                        U.error(log, "Failed to send node left message (will stop anyway)
" +
+                            "[sock=" + sock + ", msg=" + msg + ']', e);
 
                     U.closeQuiet(sock);
 
@@ -909,7 +967,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                 final Socket sock = joinTopology(false);
 
                 if (sock == null) {
-                    joinErr = new IgniteSpiException("Join process timed out");
+                    joinErr = new IgniteSpiException("Join process timed out.");
 
                     joinLatch.countDown();
 
@@ -934,8 +992,9 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                     if (msg == JOIN_TIMEOUT) {
                         if (joinLatch.getCount() > 0) {
-                            joinErr = new IgniteSpiException("Join process timed out [sock="
+ sock +
-                                ", timeout=" + spi.netTimeout + ']');
+                            joinErr = new IgniteSpiException("Join process timed out, did
not receive response for " +
+                                "join request (consider increasing 'networkTimeout' configuration
property) " +
+                                "[networkTimeout=" + spi.netTimeout + ", sock=" + sock +']');
 
                             joinLatch.countDown();
 
@@ -1027,7 +1086,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 if (joinLatch.getCount() > 0) {
                     // This should not occurs.
-                    joinErr = new IgniteSpiException("Some error occurs in joinig process");
+                    joinErr = new IgniteSpiException("Some error occur in join process.");
 
                     joinLatch.countDown();
                 }
@@ -1236,8 +1295,9 @@ class ClientImpl extends TcpDiscoveryImpl {
             if (spi.getSpiContext().isStopping()) {
                 if (!getLocalNodeId().equals(msg.creatorNodeId()) && getLocalNodeId().equals(msg.failedNodeId()))
{
                     if (leaveLatch.getCount() > 0) {
-                        log.debug("Remote node fail this node while node is stopping [locNode="
+ getLocalNodeId()
-                            + ", rmtNode=" + msg.creatorNodeId() + ']');
+                        if (log.isDebugEnabled())
+                            log.debug("Remote node fail this node while node is stopping
[locNode=" + getLocalNodeId()
+                                + ", rmtNode=" + msg.creatorNodeId() + ']');
 
                         leaveLatch.countDown();
                     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/de0d61f6/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 5aceaae..311c783 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -125,16 +125,6 @@ class ServerImpl extends TcpDiscoveryImpl {
     private final ConcurrentMap<InetSocketAddress, IgniteInternalFuture<IgniteBiTuple<UUID,
Boolean>>> pingMap =
         new ConcurrentHashMap8<>();
 
-    /** Debug mode. */
-    private boolean debugMode;
-
-    /** Debug messages history. */
-    private int debugMsgHist = 512;
-
-    /** Received messages. */
-    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
-    private ConcurrentLinkedDeque<String> debugLog;
-
     /**
      * @param adapter Adapter.
      */
@@ -142,24 +132,6 @@ class ServerImpl extends TcpDiscoveryImpl {
         super(adapter);
     }
 
-    /**
-     * This method is intended for troubleshooting purposes only.
-     *
-     * @param debugMode {code True} to start SPI in debug mode.
-     */
-    public void setDebugMode(boolean debugMode) {
-        this.debugMode = debugMode;
-    }
-
-    /**
-     * This method is intended for troubleshooting purposes only.
-     *
-     * @param debugMsgHist Message history log size.
-     */
-    public void setDebugMessageHistory(int debugMsgHist) {
-        this.debugMsgHist = debugMsgHist;
-    }
-
     /** {@inheritDoc} */
     @Override public String getSpiState() {
         synchronized (mux) {
@@ -1060,23 +1032,6 @@ class ServerImpl extends TcpDiscoveryImpl {
     }
 
     /**
-     * @param ackTimeout Acknowledgement timeout.
-     * @return {@code True} if acknowledgement timeout is less or equal to
-     * maximum acknowledgement timeout, {@code false} otherwise.
-     */
-    private boolean checkAckTimeout(long ackTimeout) {
-        if (ackTimeout > spi.maxAckTimeout) {
-            LT.warn(log, null, "Acknowledgement timeout is greater than maximum acknowledgement
timeout " +
-                "(consider increasing 'maxAckTimeout' configuration property) " +
-                "[ackTimeout=" + ackTimeout + ", maxAckTimeout=" + spi.maxAckTimeout + ']');
-
-            return false;
-        }
-
-        return true;
-    }
-
-    /**
      * Notify external listener on discovery event.
      *
      * @param type Discovery event type. See {@link org.apache.ignite.events.DiscoveryEvent}
for more details.
@@ -1422,25 +1377,6 @@ class ServerImpl extends TcpDiscoveryImpl {
 
     /**
      * @param msg Message.
-     */
-    private void debugLog(String msg) {
-        assert debugMode;
-
-        String msg0 = new SimpleDateFormat("[HH:mm:ss,SSS]").format(new Date(System.currentTimeMillis()))
+
-            '[' + Thread.currentThread().getName() + "][" + getLocalNodeId() +
-            "-" + locNode.internalOrder() + "] " +
-            msg;
-
-        debugLog.add(msg0);
-
-        int delta = debugLog.size() - debugMsgHist;
-
-        for (int i = 0; i < delta && debugLog.size() > debugMsgHist; i++)
-            debugLog.poll();
-    }
-
-    /**
-     * @param msg Message.
      * @return {@code True} if recordable in debug mode.
      */
     private boolean recordable(TcpDiscoveryAbstractMessage msg) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/de0d61f6/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index b7e9e53..94097c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -26,7 +26,9 @@ import org.apache.ignite.spi.discovery.*;
 import org.apache.ignite.spi.discovery.tcp.internal.*;
 import org.jetbrains.annotations.*;
 
+import java.text.*;
 import java.util.*;
+import java.util.concurrent.*;
 
 /**
  *
@@ -50,6 +52,16 @@ abstract class TcpDiscoveryImpl {
     /** */
     protected TcpDiscoveryNode locNode;
 
+    /** Debug mode. */
+    protected boolean debugMode;
+
+    /** Debug messages history. */
+    private int debugMsgHist = 512;
+
+    /** Received messages. */
+    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+    protected ConcurrentLinkedDeque<String> debugLog;
+
     /**
      * @param spi Adapter.
      */
@@ -60,6 +72,43 @@ abstract class TcpDiscoveryImpl {
     }
 
     /**
+     * This method is intended for troubleshooting purposes only.
+     *
+     * @param debugMode {code True} to start SPI in debug mode.
+     */
+    public void setDebugMode(boolean debugMode) {
+        this.debugMode = debugMode;
+    }
+
+    /**
+     * This method is intended for troubleshooting purposes only.
+     *
+     * @param debugMsgHist Message history log size.
+     */
+    public void setDebugMessageHistory(int debugMsgHist) {
+        this.debugMsgHist = debugMsgHist;
+    }
+
+    /**
+     * @param msg Message.
+     */
+    protected void debugLog(String msg) {
+        assert debugMode;
+
+        String msg0 = new SimpleDateFormat("[HH:mm:ss,SSS]").format(new Date(System.currentTimeMillis()))
+
+            '[' + Thread.currentThread().getName() + "][" + getLocalNodeId() +
+            "-" + locNode.internalOrder() + "] " +
+            msg;
+
+        debugLog.add(msg0);
+
+        int delta = debugLog.size() - debugMsgHist;
+
+        for (int i = 0; i < delta && debugLog.size() > debugMsgHist; i++)
+            debugLog.poll();
+    }
+
+    /**
      * @return Local node ID.
      */
     public UUID getLocalNodeId() {
@@ -209,4 +258,21 @@ abstract class TcpDiscoveryImpl {
             }
         }
     }
+
+    /**
+     * @param ackTimeout Acknowledgement timeout.
+     * @return {@code True} if acknowledgement timeout is less or equal to
+     * maximum acknowledgement timeout, {@code false} otherwise.
+     */
+    protected boolean checkAckTimeout(long ackTimeout) {
+        if (ackTimeout > spi.maxAckTimeout) {
+            LT.warn(log, null, "Acknowledgement timeout is greater than maximum acknowledgement
timeout " +
+                "(consider increasing 'maxAckTimeout' configuration property) " +
+                "[ackTimeout=" + ackTimeout + ", maxAckTimeout=" + spi.maxAckTimeout + ']');
+
+            return false;
+        }
+
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/de0d61f6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
index 24ebb7c..77ddd40 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
@@ -18,15 +18,17 @@
 package org.apache.ignite.internal.processors.cache.distributed;
 
 import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.spi.communication.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
 import org.apache.ignite.testframework.*;
 import org.apache.ignite.testframework.junits.common.*;
 
+import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
@@ -54,6 +56,12 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest
{
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
+        cfg.setConnectorConfiguration(null);
+        cfg.setPeerClassLoadingEnabled(false);
+        cfg.setTimeServerPortRange(200);
+
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setLocalPortRange(200);
+
         ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
 
         if (!clientDiscovery)
@@ -61,6 +69,12 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest
{
 
         cfg.setClientMode(client);
 
+        if (client) {
+//            cfg.setPublicThreadPoolSize(1);
+//            cfg.setPeerClassLoadingThreadPoolSize(1);
+//            cfg.setIgfsThreadPoolSize(1);
+        }
+
         CacheConfiguration ccfg = new CacheConfiguration();
 
         ccfg.setCacheMode(PARTITIONED);
@@ -85,6 +99,11 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest
{
         stopAllGrids();
     }
 
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 10 * 60_000;
+    }
+
     /**
      * @throws Exception If failed.
      */
@@ -104,6 +123,66 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest
{
     /**
      * @throws Exception If failed.
      */
+    public void testManyClientsSequentiallyClientDiscovery() throws Exception {
+        clientDiscovery = true;
+
+        manyClientsSequentially();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void manyClientsSequentially() throws Exception {
+        client = true;
+
+        List<Ignite> clients = new ArrayList<>();
+
+        final int CLIENTS = 50;
+
+        int idx = SRVS;
+
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        for (int i = 0; i < CLIENTS; i++) {
+            Ignite ignite = startGrid(idx++);
+
+            log.info("Started node: " + ignite.name());
+
+            assertTrue(ignite.configuration().isClientMode());
+
+            clients.add(ignite);
+
+            IgniteCache<Object, Object> cache = ignite.cache(null);
+
+            Integer key = rnd.nextInt(0, 1000);
+
+            cache.put(key, i);
+
+            assertNotNull(cache.get(key));
+        }
+
+        log.info("All clients started.");
+
+        assertEquals(SRVS + CLIENTS, G.allGrids().size());
+
+        long topVer = -1L;
+
+        for (Ignite ignite : G.allGrids()) {
+            assertEquals(SRVS + CLIENTS, ignite.cluster().nodes().size());
+
+            if (topVer == -1L)
+                topVer = ignite.cluster().topologyVersion();
+            else
+                assertEquals(topVer, ignite.cluster().topologyVersion());
+        }
+
+        for (Ignite client : clients)
+            client.close();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     private void manyClientsPutGet() throws Exception {
         client = true;
 
@@ -111,7 +190,7 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest
{
 
         final AtomicBoolean stop = new AtomicBoolean();
 
-        final int THREADS = 30;
+        final int THREADS = 50;
 
         final CountDownLatch latch = new CountDownLatch(THREADS);
 
@@ -143,6 +222,8 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest
{
                             cache.put(key, iter++);
 
                             assertNotNull(cache.get(key));
+
+                            Thread.sleep(1);
                         }
 
                         log.info("Stopping node: " + ignite.name());
@@ -154,6 +235,8 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest
{
 
             latch.await();
 
+            log.info("All clients started.");
+
             Thread.sleep(10_000);
 
             log.info("Stop clients.");


Mime
View raw message