ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [2/3] incubator-ignite git commit: # ignite-21
Date Wed, 10 Dec 2014 10:49:52 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioServer.java b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioServer.java
index 1e61a17..ddd301b 100644
--- a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioServer.java
@@ -11,6 +11,7 @@ package org.gridgain.grid.util.nio;
 
 import org.apache.ignite.*;
 import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
 import org.apache.ignite.thread.*;
 import org.gridgain.grid.*;
 import org.gridgain.grid.util.*;
@@ -329,7 +330,7 @@ public class GridNioServer<T> {
 
         NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg);
 
-        send0(impl, fut);
+        send0(impl, fut, false);
 
         return fut;
     }
@@ -346,7 +347,7 @@ public class GridNioServer<T> {
 
         NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg);
 
-        send0(impl, fut);
+        send0(impl, fut, false);
 
         return fut;
     }
@@ -354,19 +355,17 @@ public class GridNioServer<T> {
     /**
      * @param ses Session.
      * @param fut Future.
+     * @param sys System message flag.
      */
-    private void send0(GridSelectorNioSessionImpl ses, NioOperationFuture<?> fut) {
+    private void send0(GridSelectorNioSessionImpl ses, NioOperationFuture<?> fut, boolean sys) {
         assert ses != null;
         assert fut != null;
 
-        int msgCnt = ses.offerFuture(fut);
+        int msgCnt = sys ? ses.offerSystemFuture(fut) : ses.offerFuture(fut);
 
         if (ses.closed()) {
-            NioOperationFuture<?> fut0;
-
-            // Cleanup as session.close() may have been already finished.
-            while ((fut0 = (NioOperationFuture<?>)ses.pollFuture()) != null)
-                fut0.connectionClosed();
+            if (ses.removeFuture(fut))
+                fut.connectionClosed();
         }
         else if (msgCnt == 1)
             // Change from 0 to 1 means that worker thread should be waken up.
@@ -374,6 +373,76 @@ public class GridNioServer<T> {
     }
 
     /**
+     * Adds message at the front of the queue without acquiring back pressure semaphore.
+     *
+     * @param ses Session.
+     * @param msg Message.
+     * @return Future.
+     */
+    public GridNioFuture<?> sendSystem(GridNioSession ses, GridTcpCommunicationMessageAdapter msg) {
+        return sendSystem(ses, msg, null);
+    }
+
+    /**
+     * Adds message at the front of the queue without acquiring back pressure semaphore.
+     *
+     * @param ses Session.
+     * @param msg Message.
+     * @param lsnr Future listener notified from the session thread.
+     * @return Future.
+     */
+    public GridNioFuture<?> sendSystem(GridNioSession ses,
+        GridTcpCommunicationMessageAdapter msg,
+        @Nullable IgniteInClosure<? super GridNioFuture<?>> lsnr) {
+        assert ses instanceof GridSelectorNioSessionImpl;
+
+        GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
+
+        NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg);
+
+        if (lsnr != null) {
+            fut.listenAsync(lsnr);
+
+            assert !fut.isDone();
+        }
+
+        send0(impl, fut, true);
+
+        return fut;
+    }
+
+    /**
+     * @param ses Session.
+     */
+    public void resend(GridNioSession ses) {
+        assert ses instanceof GridSelectorNioSessionImpl;
+
+        GridNioRecoveryDescriptor recoveryDesc = ses.recoveryDescriptor();
+
+        if (recoveryDesc != null && !recoveryDesc.messagesFutures().isEmpty()) {
+            Deque<GridNioFuture<?>> futs = recoveryDesc.messagesFutures();
+
+            if (log.isDebugEnabled())
+                log.debug("Resend messages [rmtNode=" + recoveryDesc.node().id() + ", msgCnt=" + futs.size() + ']');
+
+            GridSelectorNioSessionImpl ses0 = (GridSelectorNioSessionImpl)ses;
+
+            GridNioFuture<?> fut0 = futs.iterator().next();
+
+            for (GridNioFuture<?> fut : futs) {
+                fut.messageThread(true);
+
+                ((NioOperationFuture)fut).resetMessage(ses0);
+            }
+
+            ses0.resend(futs);
+
+            // Wake up worker.
+            clientWorkers.get(ses0.selectorIndex()).offer(((NioOperationFuture)fut0));
+        }
+    }
+
+    /**
      * @param ses Session.
      * @param op Operation.
      * @return Future for operation.
@@ -385,7 +454,8 @@ public class GridNioServer<T> {
         GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
 
         if (impl.closed())
-            return new GridNioFinishedFuture(new IOException("Failed to send message (connection was closed): " + ses));
+            return new GridNioFinishedFuture(new IOException("Failed to pause/resume reads " +
+                "(connection was closed): " + ses));
 
         NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, op);
 
@@ -406,7 +476,7 @@ public class GridNioServer<T> {
         try {
             ch.configureBlocking(false);
 
-            NioOperationFuture<GridNioSession> req = new NioOperationFuture<>(ch, NioOperation.REGISTER, false, meta);
+            NioOperationFuture<GridNioSession> req = new NioOperationFuture<>(ch, false, meta);
 
             offerBalanced(req);
 
@@ -437,7 +507,7 @@ public class GridNioServer<T> {
 
     /**
      * Gets configurable idle timeout for this session. If not set, default value is
-     * {@link org.apache.ignite.configuration.IgniteConfiguration#DFLT_REST_IDLE_TIMEOUT}.
+     * {@link IgniteConfiguration#DFLT_REST_IDLE_TIMEOUT}.
      *
      * @return Idle timeout in milliseconds.
      */
@@ -1313,9 +1383,16 @@ public class GridNioServer<T> {
                     readBuf.order(order);
                 }
 
-                final GridSelectorNioSessionImpl ses = new GridSelectorNioSessionImpl(idx, filterChain,
-                    (InetSocketAddress)sockCh.getLocalAddress(), (InetSocketAddress)sockCh.getRemoteAddress(),
-                    req.accepted(), sndQueueLimit, writeBuf, readBuf);
+                final GridSelectorNioSessionImpl ses = new GridSelectorNioSessionImpl(
+                    log,
+                    idx,
+                    filterChain,
+                    (InetSocketAddress)sockCh.getLocalAddress(),
+                    (InetSocketAddress)sockCh.getRemoteAddress(),
+                    req.accepted(),
+                    sndQueueLimit,
+                    writeBuf,
+                    readBuf);
 
                 Map<Integer, ?> meta = req.meta();
 
@@ -1328,6 +1405,9 @@ public class GridNioServer<T> {
 
                 ses.key(key);
 
+                if (!ses.accepted())
+                    resend(ses);
+
                 sessions.add(ses);
 
                 try {
@@ -1418,11 +1498,27 @@ public class GridNioServer<T> {
                 // Since ses is in closed state, no write requests will be added.
                 NioOperationFuture<?> fut = ses.removeMeta(NIO_OPERATION.ordinal());
 
-                if (fut != null)
-                    fut.connectionClosed();
+                GridNioRecoveryDescriptor recovery = ses.recoveryDescriptor();
 
-                while ((fut = (NioOperationFuture<?>)ses.pollFuture()) != null)
-                    fut.connectionClosed();
+                if (recovery != null) {
+                    try {
+                        // Poll will update recovery data.
+                        while ((fut = (NioOperationFuture<?>)ses.pollFuture()) != null) {
+                            if (fut.skipRecovery())
+                                fut.connectionClosed();
+                        }
+                    }
+                    finally {
+                        recovery.release();
+                    }
+                }
+                else {
+                    if (fut != null)
+                        fut.connectionClosed();
+
+                    while ((fut = (NioOperationFuture<?>)ses.pollFuture()) != null)
+                        fut.connectionClosed();
+                }
 
                 return true;
             }
@@ -1669,19 +1765,22 @@ public class GridNioServer<T> {
          * @param sockCh Socket channel to register on selector.
          */
         NioOperationFuture(SocketChannel sockCh) {
-            this(sockCh, NioOperation.REGISTER, true, null);
+            this(sockCh, true, null);
         }
 
         /**
          * @param sockCh Socket channel.
-         * @param op Operation.
          * @param accepted {@code True} if socket has been accepted.
          * @param meta Optional meta.
          */
-        NioOperationFuture(SocketChannel sockCh, NioOperation op, boolean accepted,
-            @Nullable Map<Integer, ?> meta) {
+        NioOperationFuture(
+            SocketChannel sockCh,
+            boolean accepted,
+            @Nullable Map<Integer, ?> meta
+        ) {
+            op = NioOperation.REGISTER;
+
             this.sockCh = sockCh;
-            this.op = op;
             this.accepted = accepted;
             this.meta = meta;
         }
@@ -1761,6 +1860,17 @@ public class GridNioServer<T> {
         }
 
         /**
+         * @param ses New session instance.
+         */
+        private void resetMessage(GridSelectorNioSessionImpl ses) {
+            assert commMsg != null;
+
+            commMsg = commMsg.clone();
+
+            this.ses = ses;
+        }
+
+        /**
          * @return Socket channel for register request.
          */
         private SocketChannel socketChannel() {
@@ -1799,6 +1909,11 @@ public class GridNioServer<T> {
         }
 
         /** {@inheritDoc} */
+        @Override public boolean skipRecovery() {
+            return commMsg != null && commMsg.skipRecovery();
+        }
+
+        /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(NioOperationFuture.class, this);
         }
@@ -1836,9 +1951,9 @@ public class GridNioServer<T> {
         /** {@inheritDoc} */
         @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) {
             if (directMode) {
-                boolean sslSystem = sslFilter != null && msg instanceof ByteBuffer;
+                boolean sslSys = sslFilter != null && msg instanceof ByteBuffer;
 
-                if (sslSystem) {
+                if (sslSys) {
                     ConcurrentLinkedDeque8<ByteBuffer> queue = ses.meta(BUF_SSL_SYSTEM_META_KEY);
 
                     queue.offer((ByteBuffer)msg);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioSession.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioSession.java b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioSession.java
index e1600ba..27acfcc 100644
--- a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioSession.java
+++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioSession.java
@@ -146,4 +146,14 @@ public interface GridNioSession {
      * @return {@code True} if reads are paused.
      */
     public boolean readsPaused();
+
+    /**
+     * @param recoveryDesc Recovery descriptor.
+     */
+    public void recoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc);
+
+    /**
+     * @return Recovery descriptor if recovery is supported, {@code null otherwise.}
+     */
+    @Nullable public GridNioRecoveryDescriptor recoveryDescriptor();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioSessionImpl.java b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioSessionImpl.java
index d81f98f..347cb72 100644
--- a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioSessionImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioSessionImpl.java
@@ -288,6 +288,16 @@ public class GridNioSessionImpl implements GridNioSession {
     }
 
     /** {@inheritDoc} */
+    @Override public void recoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public GridNioRecoveryDescriptor recoveryDescriptor() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridNioSessionImpl.class, this);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/gridgain/grid/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridSelectorNioSessionImpl.java
index c44c6a9..db772dc 100644
--- a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridSelectorNioSessionImpl.java
@@ -9,6 +9,7 @@
 
 package org.gridgain.grid.util.nio;
 
+import org.apache.ignite.*;
 import org.gridgain.grid.util.typedef.internal.*;
 import org.gridgain.grid.util.tostring.*;
 import org.jdk8.backport.*;
@@ -17,6 +18,7 @@ import org.jetbrains.annotations.*;
 import java.net.*;
 import java.nio.*;
 import java.nio.channels.*;
+import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
@@ -49,9 +51,16 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     /** Read buffer. */
     private ByteBuffer readBuf;
 
+    /** Recovery data. */
+    private GridNioRecoveryDescriptor recovery;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
     /**
      * Creates session instance.
      *
+     * @param log Logger.
      * @param selectorIdx Selector index for this session.
      * @param filterChain Filter chain that will handle requests.
      * @param locAddr Local address.
@@ -62,6 +71,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
      * @param readBuf Read buffer.
      */
     GridSelectorNioSessionImpl(
+        IgniteLogger log,
         int selectorIdx,
         GridNioFilterChain filterChain,
         InetSocketAddress locAddr,
@@ -79,6 +89,10 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
         assert locAddr != null : "GridSelectorNioSessionImpl should have local socket address.";
         assert rmtAddr != null : "GridSelectorNioSessionImpl should have remote socket address.";
 
+        assert log != null;
+
+        this.log = log;
+
         this.selectorIdx = selectorIdx;
 
         sem = sndQueueLimit > 0 ? new Semaphore(sndQueueLimit) : null;
@@ -136,6 +150,22 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     }
 
     /**
+     * Adds write future at the front of the queue without acquiring back pressure semaphore.
+     *
+     * @param writeFut Write request.
+     * @return Updated size of the queue.
+     */
+    int offerSystemFuture(GridNioFuture<?> writeFut) {
+        writeFut.messageThread(true);
+
+        boolean res = queue.offerFirst(writeFut);
+
+        assert res : "Future was not added to queue";
+
+        return queueSize.incrementAndGet();
+    }
+
+    /**
      * Adds write future to the pending list and returns the size of the queue.
      * <p>
      * Note that separate counter for the queue size is needed because in case of concurrent
@@ -161,6 +191,21 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     }
 
     /**
+     * @param futs Futures to resend.
+     */
+    void resend(Collection<GridNioFuture<?>> futs) {
+        assert queue.isEmpty() : queue.size();
+
+        boolean add = queue.addAll(futs);
+
+        assert add;
+
+        boolean set = queueSize.compareAndSet(0, futs.size());
+
+        assert set;
+    }
+
+    /**
      * @return Message that is in the head of the queue, {@code null} if queue is empty.
      */
     @Nullable GridNioFuture<?> pollFuture() {
@@ -171,12 +216,38 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
 
             if (sem != null && !last.messageThread())
                 sem.release();
+
+            if (recovery != null) {
+                if (!recovery.add(last)) {
+                    LT.warn(log, null, "Unacknowledged messages queue size overflow, will attempt to reconnect " +
+                        "[remoteAddr=" + remoteAddress() +
+                        ", queueLimit=" + recovery.queueLimit() + ']');
+
+                    if (log.isDebugEnabled())
+                        log.debug("Unacknowledged messages queue size overflow, will attempt to reconnect " +
+                            "[remoteAddr=" + remoteAddress() +
+                            ", queueSize=" + recovery.messagesFutures().size() +
+                            ", queueLimit=" + recovery.queueLimit() + ']');
+
+                    close();
+                }
+            }
         }
 
         return last;
     }
 
     /**
+     * @param fut Future.
+     * @return {@code True} if future was removed from queue.
+     */
+    boolean removeFuture(GridNioFuture<?> fut) {
+        assert closed();
+
+        return queue.removeLastOccurrence(fut);
+    }
+
+    /**
      * Gets number of write requests in a queue that have not been processed yet.
      *
      * @return Number of write requests.
@@ -186,6 +257,32 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     }
 
     /** {@inheritDoc} */
+    @Override public void recoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
+        assert recoveryDesc != null;
+
+        recovery = recoveryDesc;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public GridNioRecoveryDescriptor recoveryDescriptor() {
+        return recovery;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T addMeta(int key, @Nullable T val) {
+        if (val instanceof GridNioRecoveryDescriptor) {
+            recovery = (GridNioRecoveryDescriptor)val;
+
+            if (!accepted())
+                recovery.connected();
+
+            return null;
+        }
+        else
+            return super.addMeta(key, val);
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridSelectorNioSessionImpl.class, this, super.toString());
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/gridgain/grid/util/nio/GridShmemCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridShmemCommunicationClient.java b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridShmemCommunicationClient.java
index d5dec86..505c788 100644
--- a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridShmemCommunicationClient.java
+++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridShmemCommunicationClient.java
@@ -103,7 +103,7 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien
     }
 
     /** {@inheritDoc} */
-    @Override public synchronized void sendMessage(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg)
+    @Override public synchronized boolean sendMessage(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg)
         throws GridException {
         if (closed())
             throw new GridException("Communication client was closed: " + this);
@@ -120,6 +120,8 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien
         }
 
         markUsed();
+
+        return false;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/gridgain/grid/util/nio/GridTcpCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridTcpCommunicationClient.java b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridTcpCommunicationClient.java
index d1c8c5e..fbca363 100644
--- a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridTcpCommunicationClient.java
+++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridTcpCommunicationClient.java
@@ -182,7 +182,7 @@ public class GridTcpCommunicationClient extends GridAbstractCommunicationClient
     }
 
     /** {@inheritDoc} */
-    @Override public void sendMessage(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg)
+    @Override public boolean sendMessage(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg)
         throws GridException {
         if (closed())
             throw new GridException("Client was closed: " + this);
@@ -199,6 +199,8 @@ public class GridTcpCommunicationClient extends GridAbstractCommunicationClient
         }
 
         markUsed();
+
+        return false;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/main/java/org/gridgain/grid/util/nio/GridTcpNioCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridTcpNioCommunicationClient.java b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridTcpNioCommunicationClient.java
index 6d2a8f7..55997d3 100644
--- a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridTcpNioCommunicationClient.java
+++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridTcpNioCommunicationClient.java
@@ -9,6 +9,7 @@
 
 package org.gridgain.grid.util.nio;
 
+import org.apache.ignite.*;
 import org.gridgain.grid.*;
 import org.gridgain.grid.util.direct.*;
 import org.jetbrains.annotations.*;
@@ -23,27 +24,24 @@ import java.util.*;
  * Grid client for NIO server.
  */
 public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClient {
-    /** Socket. */
+    /** Session. */
     private final GridNioSession ses;
 
-    /**
-     * Constructor for test purposes only.
-     */
-    public GridTcpNioCommunicationClient() {
-        super(null);
-
-        ses = null;
-    }
+    /** Logger. */
+    private final IgniteLogger log;
 
     /**
      * @param ses Session.
+     * @param log Logger.
      */
-    public GridTcpNioCommunicationClient(GridNioSession ses) {
+    public GridTcpNioCommunicationClient(GridNioSession ses, IgniteLogger log) {
         super(null);
 
         assert ses != null;
+        assert log != null;
 
         this.ses = ses;
+        this.log = log;
     }
 
     /**
@@ -98,14 +96,11 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie
     }
 
     /** {@inheritDoc} */
-    @Override public void sendMessage(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg)
+    @Override public boolean sendMessage(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg)
         throws GridException {
         // Node ID is never provided in asynchronous send mode.
         assert nodeId == null;
 
-        if (closed())
-            throw new GridException("Client was closed: " + this);
-
         GridNioFuture<?> fut = ses.send(msg);
 
         if (fut.isDone()) {
@@ -113,9 +108,23 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie
                 fut.get();
             }
             catch (IOException e) {
-                throw new GridException("Failed to send message [client=" + this + ']', e);
+                if (log.isDebugEnabled())
+                    log.debug("Failed to send message [client=" + this + ", err=" +e + ']');
+
+                return true;
+            }
+            catch (GridException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to send message [client=" + this + ", err=" +e + ']');
+
+                if (e.getCause() instanceof IOException)
+                    return true;
+                else
+                    throw new GridException("Failed to send message [client=" + this + ']', e);
             }
         }
+
+        return false;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
index dc2ee2a..86d68a7 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
@@ -9,20 +9,22 @@
 
 package org.apache.ignite.spi.communication;
 
-import mx4j.tools.adaptor.http.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.lang.*;
 import org.gridgain.grid.*;
 import org.gridgain.grid.util.direct.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
 import org.gridgain.testframework.*;
-import org.gridgain.testframework.config.*;
 import org.gridgain.testframework.junits.*;
 import org.gridgain.testframework.junits.spi.*;
 
-import javax.management.*;
+import java.net.*;
 import java.util.*;
 import java.util.Map.*;
 
+import static org.gridgain.grid.kernal.GridNodeAttributes.*;
+
 /**
  * Super class for all communication self tests.
  * @param <T> Type of communication SPI.
@@ -47,22 +49,15 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
     /** */
     private static final Object mux = new Object();
 
-    /** */
-    private static final ObjectName mBeanName;
-
+    /**
+     *
+     */
     static {
         GridTcpCommunicationMessageFactory.registerCustom(new GridTcpCommunicationMessageProducer() {
             @Override public GridTcpCommunicationMessageAdapter create(byte type) {
                 return new GridTestMessage();
             }
         }, GridTestMessage.DIRECT_TYPE);
-
-        try {
-            mBeanName = new ObjectName("mbeanAdaptor:protocol=HTTP");
-        }
-        catch (MalformedObjectNameException e) {
-            throw new GridRuntimeException(e);
-        }
     }
 
     /** */
@@ -237,6 +232,36 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
 
     /** {@inheritDoc} */
     @Override protected void beforeTestsStarted() throws Exception {
+        for (int i = 0; i < 3; i++) {
+            try {
+                startSpis();
+
+                break;
+            }
+            catch (GridException e) {
+                if (e.hasCause(BindException.class)) {
+                    if (i < 2) {
+                        info("Failed to start SPIs because of BindException, will retry after delay.");
+
+                        afterTestsStopped();
+
+                        U.sleep(30_000);
+                    }
+                    else
+                        throw e;
+                }
+                else
+                    throw e;
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void startSpis() throws Exception {
+        U.setWorkDirectory(null, U.getGridGainHome());
+
         spis.clear();
         nodes.clear();
         spiRsrcs.clear();
@@ -246,10 +271,14 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
         for (int i = 0; i < getSpiCount(); i++) {
             CommunicationSpi<GridTcpCommunicationMessageAdapter> spi = getSpi(i);
 
-            GridTestResources rsrcs = new GridTestResources(getMBeanServer(i));
+            GridTestUtils.setFieldValue(spi, "gridName", "grid-" + i);
+
+            GridTestResources rsrcs = new GridTestResources();
 
             GridTestNode node = new GridTestNode(rsrcs.getNodeId());
 
+            node.order(i);
+
             GridSpiTestContext ctx = initSpiContext();
 
             ctx.setLocalNode(node);
@@ -263,6 +292,7 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
             spi.setListener(new MessageListener(rsrcs.getNodeId()));
 
             node.setAttributes(spi.getNodeAttributes());
+            node.setAttribute(ATTR_MACS, F.concat(U.allLocalMACs(), ", "));
 
             nodes.add(node);
 
@@ -284,38 +314,17 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
         }
     }
 
-    /**
-     * @param idx Node index.
-     * @return Configured MBean server.
-     * @throws Exception If failed.
-     */
-    private MBeanServer getMBeanServer(int idx) throws Exception {
-        HttpAdaptor mbeanAdaptor = new HttpAdaptor();
-
-        MBeanServer mbeanSrv = MBeanServerFactory.createMBeanServer();
-
-        mbeanAdaptor.setPort(
-            Integer.valueOf(GridTestProperties.getProperty("comm.mbeanserver.selftest.baseport")) + idx);
-
-        mbeanSrv.registerMBean(mbeanAdaptor, mBeanName);
-
-        mbeanAdaptor.start();
-
-        return mbeanSrv;
-    }
-
     /** {@inheritDoc} */
     @Override protected void afterTestsStopped() throws Exception {
         for (CommunicationSpi<GridTcpCommunicationMessageAdapter> spi : spis.values()) {
+            spi.onContextDestroyed();
+
             spi.setListener(null);
 
             spi.spiStop();
         }
 
-        for (GridTestResources rsrcs : spiRsrcs) {
+        for (GridTestResources rsrcs : spiRsrcs)
             rsrcs.stopThreads();
-
-            rsrcs.getMBeanServer().unregisterMBean(mBeanName);
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/test/java/org/apache/ignite/spi/communication/GridTestMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridTestMessage.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridTestMessage.java
index 8015176..58cb184 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridTestMessage.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridTestMessage.java
@@ -86,14 +86,23 @@ public class GridTestMessage extends GridTcpCommunicationMessageAdapter {
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("CloneDoesntCallSuperClone")
+    @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
     @Override public GridTcpCommunicationMessageAdapter clone() {
-        throw new UnsupportedOperationException();
+        GridTestMessage msg = new GridTestMessage();
+
+        clone0(msg);
+
+        return msg;
     }
 
     /** {@inheritDoc} */
     @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
-        // No-op.
+        GridTestMessage _clone = (GridTestMessage)_msg;
+
+        _clone.srcNodeId = srcNodeId;
+        _clone.msgId = msgId;
+        _clone.resId = resId;
+        _clone.payload = payload;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
index 16c7ea0..1dabf5d 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
@@ -29,9 +29,6 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
     public static final int IDLE_CONN_TIMEOUT = 2000;
 
     /** */
-    private boolean tcpNoDelay;
-
-    /** */
     private final boolean useShmem;
 
     /**
@@ -50,11 +47,18 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
 
         spi.setLocalPort(GridTestUtils.getNextCommPort(getClass()));
         spi.setIdleConnectionTimeout(IDLE_CONN_TIMEOUT);
-        spi.setTcpNoDelay(tcpNoDelay);
+        spi.setTcpNoDelay(tcpNoDelay());
 
         return spi;
     }
 
+    /**
+     * @return Value of property '{@link TcpCommunicationSpi#isTcpNoDelay()}'.
+     */
+    protected boolean tcpNoDelay() {
+        return true;
+    }
+
     /** {@inheritDoc} */
     @Override protected int getSpiCount() {
         return SPI_COUNT;
@@ -68,21 +72,12 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
         for (CommunicationSpi spi : spis.values()) {
             ConcurrentMap<UUID, GridTcpCommunicationClient> clients = U.field(spi, "clients");
 
-            assert clients.size() == 2;
+            assertEquals(2, clients.size());
 
             clients.put(UUID.randomUUID(), F.first(clients.values()));
         }
     }
 
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTcpNoDelay() throws Exception {
-        tcpNoDelay = true;
-
-        super.testSendToManyNodes();
-    }
-
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         super.afterTest();
@@ -91,7 +86,8 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
             ConcurrentMap<UUID, GridTcpCommunicationClient> clients = U.field(spi, "clients");
 
             for (int i = 0; i < 10 && !clients.isEmpty(); i++) {
-                U.warn(log, "Check failed for SPI: " + spi);
+                info("Check failed for SPI [grid=" + GridTestUtils.getFieldValue(spi, "gridName") +
+                    ", spi=" + spi + ']');
 
                 U.sleep(1000);
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
new file mode 100644
index 0000000..ffe5d57
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
@@ -0,0 +1,398 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.communication.*;
+import org.eclipse.jetty.util.*;
+import org.gridgain.grid.*;
+import org.gridgain.grid.util.direct.*;
+import org.gridgain.grid.util.lang.*;
+import org.gridgain.grid.util.nio.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.gridgain.testframework.*;
+import org.gridgain.testframework.junits.*;
+import org.gridgain.testframework.junits.spi.*;
+
+import java.net.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ *
+ */
+@GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI")
+public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends CommunicationSpi>
+    extends GridSpiAbstractTest<T> {
+    /** */
+    private static final int SPI_CNT = 2;
+
+    /** */
+    private static final int ITERS = 50;
+
+    /** */
+    private static final Collection<GridTestResources> spiRsrcs = new ArrayList<>();
+
+    /** */
+    protected static final List<CommunicationSpi<GridTcpCommunicationMessageAdapter>> spis = new ArrayList<>();
+
+    /** */
+    protected static final List<ClusterNode> nodes = new ArrayList<>();
+
+    /** */
+    private static int port = 60_000;
+
+    /**
+     *
+     */
+    static {
+        GridTcpCommunicationMessageFactory.registerCustom(new GridTcpCommunicationMessageProducer() {
+            @Override
+            public GridTcpCommunicationMessageAdapter create(byte type) {
+                return new GridTestMessage();
+            }
+        }, GridTestMessage.DIRECT_TYPE);
+    }
+
+    /**
+     * Disable SPI auto-start.
+     */
+    public GridTcpCommunicationSpiConcurrentConnectSelfTest() {
+        super(false);
+    }
+
+    /**
+     *
+     */
+    private static class MessageListener implements CommunicationListener<GridTcpCommunicationMessageAdapter> {
+        /** */
+        private final CountDownLatch latch;
+
+        /** */
+        private final AtomicInteger cntr = new AtomicInteger();
+
+        /** */
+        private final ConcurrentHashSet<Long> msgIds = new ConcurrentHashSet<>();
+
+        /**
+         * @param latch Latch.
+         */
+        MessageListener(CountDownLatch latch) {
+            this.latch = latch;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMessage(UUID nodeId, GridTcpCommunicationMessageAdapter msg, IgniteRunnable msgC) {
+            msgC.run();
+
+            assertTrue(msg instanceof GridTestMessage);
+
+            cntr.incrementAndGet();
+
+            GridTestMessage msg0 = (GridTestMessage)msg;
+
+            assertEquals(nodeId, msg0.getSourceNodeId());
+
+            assertTrue(msgIds.add(msg0.getMsgId()));
+
+            latch.countDown();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onDisconnected(UUID nodeId) {
+            // No-op.
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTwoThreads() throws Exception {
+        concurrentConnect(2, 10, ITERS, false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultithreaded() throws Exception {
+        int threads = Runtime.getRuntime().availableProcessors() * 5;
+
+        concurrentConnect(threads, 10, ITERS, false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testWithLoad() throws Exception {
+        int threads = Runtime.getRuntime().availableProcessors() * 5;
+
+        concurrentConnect(threads, 10, ITERS / 2, false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRandomSleep() throws Exception {
+        concurrentConnect(4, 1, ITERS, true, false);
+    }
+
+    /**
+     * @param threads Number of threads.
+     * @param msgPerThread Messages per thread.
+     * @param iters Number of iterations.
+     * @param sleep If {@code true} sleeps random time before starts send messages.
+     * @param load Run load threads flag.
+     * @throws Exception If failed.
+     */
+    private void concurrentConnect(final int threads,
+        final int msgPerThread,
+        final int iters,
+        final boolean sleep,
+        boolean load) throws Exception {
+        log.info("Concurrent connect [threads=" + threads +
+            ", msgPerThread=" + msgPerThread +
+            ", iters=" + iters +
+            ", load=" + load +
+            ", sleep=" + sleep + ']');
+
+        final AtomicBoolean stop = new AtomicBoolean();
+
+        IgniteFuture<?> loadFut = null;
+
+        if (load) {
+            loadFut = GridTestUtils.runMultiThreadedAsync(new Callable<Long>() {
+                @Override
+                public Long call() throws Exception {
+                    long dummyRes = 0;
+
+                    List<String> list = new ArrayList<>();
+
+                    while (!stop.get()) {
+                        for (int i = 0; i < 100; i++) {
+                            String str = new String(new byte[i]);
+
+                            list.add(str);
+
+                            dummyRes += str.hashCode();
+                        }
+
+                        if (list.size() > 1000_000) {
+                            list = new ArrayList<>();
+
+                            System.gc();
+                        }
+                    }
+
+                    return dummyRes;
+                }
+            }, 2, "test-load");
+        }
+
+        try {
+            for (int i = 0; i < iters; i++) {
+                log.info("Iteration: " + i);
+
+                final AtomicInteger msgId = new AtomicInteger();
+
+                final int expMsgs = threads * msgPerThread;
+
+                CountDownLatch latch = new CountDownLatch(expMsgs);
+
+                MessageListener lsnr = new MessageListener(latch);
+
+                createSpis(lsnr);
+
+                final AtomicInteger idx = new AtomicInteger();
+
+                try {
+                    GridTestUtils.runMultiThreaded(new Callable<Void>() {
+                        @Override public Void call() throws Exception {
+                            int idx0 = idx.getAndIncrement();
+
+                            Thread.currentThread().setName("Test thread [idx=" + idx0 + ", grid=" + (idx0 % 2) + ']');
+
+                            CommunicationSpi<GridTcpCommunicationMessageAdapter> spi = spis.get(idx0 % 2);
+
+                            ClusterNode srcNode = nodes.get(idx0 % 2);
+
+                            ClusterNode dstNode = nodes.get((idx0 + 1) % 2);
+
+                            if (sleep) {
+                                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                                long millis = rnd.nextLong(10);
+
+                                if (millis > 0)
+                                    Thread.sleep(millis);
+                            }
+
+                            for (int i = 0; i < msgPerThread; i++)
+                                spi.sendMessage(dstNode, new GridTestMessage(srcNode.id(), msgId.incrementAndGet(), 0));
+
+                            return null;
+                        }
+                    }, threads, "test");
+
+                    assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+                    for (CommunicationSpi spi : spis) {
+                        ConcurrentMap<UUID, GridTcpCommunicationClient> clients = U.field(spi, "clients");
+
+                        assertEquals(1, clients.size());
+
+                        final GridNioServer srv = U.field(spi, "nioSrvr");
+
+                        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                            @Override public boolean apply() {
+                                Collection sessions = U.field(srv, "sessions");
+
+                                return sessions.size() == 1;
+                            }
+                        }, 5000);
+
+                        Collection sessions = U.field(srv, "sessions");
+
+                        assertEquals(1, sessions.size());
+                    }
+
+                    assertEquals(expMsgs, lsnr.cntr.get());
+                }
+                finally {
+                    stopSpis();
+                }
+            }
+        }
+        finally {
+            stop.set(true);
+
+            if (loadFut != null)
+                loadFut.get();
+        }
+    }
+
+    /**
+     * @return SPI.
+     */
+    private CommunicationSpi createSpi() {
+        TcpCommunicationSpi spi = new TcpCommunicationSpi();
+
+        spi.setLocalAddress("127.0.0.1");
+        spi.setSharedMemoryPort(-1);
+        spi.setLocalPort(port++);
+        spi.setIdleConnectionTimeout(60_000);
+        spi.setConnectTimeout(10_000);
+
+        return spi;
+    }
+
+    /**
+     * @param lsnr Message listener.
+     * @throws Exception If failed.
+     */
+    private void startSpis(MessageListener lsnr) throws Exception {
+        spis.clear();
+        nodes.clear();
+        spiRsrcs.clear();
+
+        Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>();
+
+        for (int i = 0; i < SPI_CNT; i++) {
+            CommunicationSpi<GridTcpCommunicationMessageAdapter> spi = createSpi();
+
+            GridTestUtils.setFieldValue(spi, "gridName", "grid-" + i);
+
+            GridTestResources rsrcs = new GridTestResources();
+
+            GridTestNode node = new GridTestNode(rsrcs.getNodeId());
+
+            node.order(i + 1);
+
+            GridSpiTestContext ctx = initSpiContext();
+
+            ctx.setLocalNode(node);
+
+            info(">>> Initialized context: nodeId=" + ctx.localNode().id());
+
+            spiRsrcs.add(rsrcs);
+
+            rsrcs.inject(spi);
+
+            spi.setListener(lsnr);
+
+            node.setAttributes(spi.getNodeAttributes());
+
+            nodes.add(node);
+
+            spi.spiStart(getTestGridName() + (i + 1));
+
+            spis.add(spi);
+
+            spi.onContextInitialized(ctx);
+
+            ctxs.put(node, ctx);
+        }
+
+        // For each context set remote nodes.
+        for (Map.Entry<ClusterNode, GridSpiTestContext> e : ctxs.entrySet()) {
+            for (ClusterNode n : nodes) {
+                if (!n.equals(e.getKey()))
+                    e.getValue().remoteNodes().add(n);
+            }
+        }
+    }
+
+    /**
+     * @param lsnr Message listener.
+     * @throws Exception If failed.
+     */
+    private void createSpis(MessageListener lsnr) throws Exception {
+        for (int i = 0; i < 3; i++) {
+            try {
+                startSpis(lsnr);
+
+                break;
+            }
+            catch (GridException e) {
+                if (e.hasCause(BindException.class)) {
+                    if (i < 2) {
+                        info("Failed to start SPIs because of BindException, will retry after delay.");
+
+                        stopSpis();
+
+                        U.sleep(10_000);
+                    }
+                    else
+                        throw e;
+                }
+                else
+                    throw e;
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void stopSpis() throws Exception {
+        for (CommunicationSpi<GridTcpCommunicationMessageAdapter> spi : spis) {
+            spi.onContextDestroyed();
+
+            spi.setListener(null);
+
+            spi.spiStop();
+        }
+
+        for (GridTestResources rsrcs : spiRsrcs)
+            rsrcs.stopThreads();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
index 833c20d..ba34ddd 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
@@ -37,5 +37,9 @@ public class GridTcpCommunicationSpiConfigSelfTest extends GridSpiAbstractConfig
         checkNegativeSpiProperty(new TcpCommunicationSpi(), "bufferSizeRatio", 0);
         checkNegativeSpiProperty(new TcpCommunicationSpi(), "connectTimeout", -1);
         checkNegativeSpiProperty(new TcpCommunicationSpi(), "maxConnectTimeout", -1);
+        checkNegativeSpiProperty(new TcpCommunicationSpi(), "socketWriteTimeout", -1);
+        checkNegativeSpiProperty(new TcpCommunicationSpi(), "ackSendThreshold", 0);
+        checkNegativeSpiProperty(new TcpCommunicationSpi(), "ackSendThreshold", -1);
+        checkNegativeSpiProperty(new TcpCommunicationSpi(), "unacknowledgedMessagesBufferSize", -1);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
index c49f104..0d650fb 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
@@ -9,27 +9,27 @@
 
 package org.apache.ignite.spi.communication.tcp;
 
-import mx4j.tools.adaptor.http.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.lang.*;
 import org.gridgain.grid.*;
 import org.apache.ignite.spi.communication.*;
 import org.gridgain.grid.util.direct.*;
+import org.gridgain.grid.util.lang.*;
 import org.gridgain.grid.util.nio.*;
 import org.gridgain.grid.util.typedef.*;
 import org.gridgain.grid.util.typedef.internal.*;
 import org.gridgain.testframework.*;
-import org.gridgain.testframework.config.*;
 import org.gridgain.testframework.junits.*;
 import org.gridgain.testframework.junits.spi.*;
 import org.jdk8.backport.*;
 
-import javax.management.*;
 import java.util.*;
 import java.util.Map.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
+import static org.gridgain.grid.kernal.GridNodeAttributes.*;
+
 /**
  * Class for multithreaded {@link TcpCommunicationSpi} test.
  */
@@ -60,9 +60,6 @@ public abstract class GridTcpCommunicationSpiMultithreadedSelfTest extends GridS
     /** Initialized nodes */
     private static final List<ClusterNode> nodes = new ArrayList<>();
 
-    /** */
-    private static final ObjectName mBeanName;
-
     /** Flag indicating if listener should reject messages. */
     private static boolean reject;
 
@@ -72,13 +69,6 @@ public abstract class GridTcpCommunicationSpiMultithreadedSelfTest extends GridS
                 return new GridTestMessage();
             }
         }, GridTestMessage.DIRECT_TYPE);
-
-        try {
-            mBeanName = new ObjectName("mbeanAdaptor:protocol=HTTP");
-        }
-        catch (MalformedObjectNameException e) {
-            throw new GridRuntimeException(e);
-        }
     }
 
     /**
@@ -168,8 +158,7 @@ public abstract class GridTcpCommunicationSpiMultithreadedSelfTest extends GridS
 
         assertEquals("Invalid listener count", getSpiCount(), lsnrs.size());
 
-        final ConcurrentMap<UUID, ConcurrentLinkedDeque8<GridTestMessage>> msgs =
-            new ConcurrentHashMap<>();
+        final ConcurrentMap<UUID, ConcurrentLinkedDeque8<GridTestMessage>> msgs = new ConcurrentHashMap<>();
 
         final int iterationCnt = 5000;
 
@@ -331,6 +320,28 @@ public abstract class GridTcpCommunicationSpiMultithreadedSelfTest extends GridS
         run.set(false);
 
         fut2.get();
+
+        // Wait when all messages are acknowledged to do not break next tests' logic.
+        for (CommunicationSpi<GridTcpCommunicationMessageAdapter> spi : spis.values()) {
+            GridNioServer srv = U.field(spi, "nioSrvr");
+
+            Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
+
+            for (GridNioSession ses : sessions) {
+                final GridNioRecoveryDescriptor snd = ses.recoveryDescriptor();
+
+                if (snd != null) {
+                    GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                        @Override public boolean apply() {
+                            return snd.messagesFutures().isEmpty();
+                        }
+                    }, 10_000);
+
+                    assertEquals("Unexpected messages: " + snd.messagesFutures(), 0,
+                        snd.messagesFutures().size());
+                }
+            }
+        }
     }
 
     /**
@@ -415,6 +426,8 @@ public abstract class GridTcpCommunicationSpiMultithreadedSelfTest extends GridS
 
     /** {@inheritDoc} */
     @Override protected void beforeTestsStarted() throws Exception {
+        U.setWorkDirectory(null, U.getGridGainHome());
+
         spis.clear();
         nodes.clear();
         spiRsrcs.clear();
@@ -425,10 +438,14 @@ public abstract class GridTcpCommunicationSpiMultithreadedSelfTest extends GridS
         for (int i = 0; i < getSpiCount(); i++) {
             CommunicationSpi<GridTcpCommunicationMessageAdapter> spi = newCommunicationSpi();
 
-            GridTestResources rsrcs = new GridTestResources(getMBeanServer(i));
+            GridTestUtils.setFieldValue(spi, "gridName", "grid-" + i);
+
+            GridTestResources rsrcs = new GridTestResources();
 
             GridTestNode node = new GridTestNode(rsrcs.getNodeId());
 
+            node.order(i);
+
             GridSpiTestContext ctx = initSpiContext();
 
             ctx.setLocalNode(node);
@@ -448,6 +465,7 @@ public abstract class GridTcpCommunicationSpiMultithreadedSelfTest extends GridS
             info("Lsnrs: " + lsnrs);
 
             node.setAttributes(spi.getNodeAttributes());
+            node.setAttribute(ATTR_MACS, F.concat(U.allLocalMACs(), ", "));
 
             nodes.add(node);
 
@@ -491,40 +509,19 @@ public abstract class GridTcpCommunicationSpiMultithreadedSelfTest extends GridS
         }
     }
 
-    /**
-     * @param idx Node index.
-     * @return Configured MBean server.
-     * @throws Exception If failed.
-     */
-    private MBeanServer getMBeanServer(int idx) throws Exception {
-        HttpAdaptor mbeanAdaptor = new HttpAdaptor();
-
-        MBeanServer mbeanSrv = MBeanServerFactory.createMBeanServer();
-
-        mbeanAdaptor.setPort(
-            Integer.valueOf(GridTestProperties.getProperty("comm.mbeanserver.selftest.baseport")) + idx);
-
-        mbeanSrv.registerMBean(mbeanAdaptor, mBeanName);
-
-        mbeanAdaptor.start();
-
-        return mbeanSrv;
-    }
-
     /** {@inheritDoc} */
     @Override protected void afterTestsStopped() throws Exception {
         for (CommunicationSpi<GridTcpCommunicationMessageAdapter> spi : spis.values()) {
+            spi.onContextDestroyed();
+
             spi.setListener(null);
 
             spi.spiStop();
         }
 
-        for (GridTestResources rsrcs : spiRsrcs) {
+        for (GridTestResources rsrcs : spiRsrcs)
             rsrcs.stopThreads();
 
-            rsrcs.getMBeanServer().unregisterMBean(mBeanName);
-        }
-
         lsnrs.clear();
         spiRsrcs.clear();
         spis.clear();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedShmemTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedShmemTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedShmemTest.java
index 98e614e..70334ee 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedShmemTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedShmemTest.java
@@ -15,6 +15,6 @@ package org.apache.ignite.spi.communication.tcp;
 public class GridTcpCommunicationSpiMultithreadedShmemTest extends GridTcpCommunicationSpiMultithreadedSelfTest {
     /** */
     public GridTcpCommunicationSpiMultithreadedShmemTest() {
-        super(true);
+        super(false);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dae4b942/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
new file mode 100644
index 0000000..b20e50e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
@@ -0,0 +1,426 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.communication.*;
+import org.eclipse.jetty.util.*;
+import org.gridgain.grid.*;
+import org.gridgain.grid.util.direct.*;
+import org.gridgain.grid.util.lang.*;
+import org.gridgain.grid.util.nio.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.gridgain.testframework.*;
+import org.gridgain.testframework.junits.*;
+import org.gridgain.testframework.junits.spi.*;
+
+import java.net.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ *
+ */
+@GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI")
+public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationSpi> extends GridSpiAbstractTest<T> {
+    /** */
+    private static final Collection<GridTestResources> spiRsrcs = new ArrayList<>();
+
+    /** */
+    protected static final List<TcpCommunicationSpi> spis = new ArrayList<>();
+
+    /** */
+    protected static final List<ClusterNode> nodes = new ArrayList<>();
+
+    /** */
+    private static final int SPI_CNT = 2;
+
+    /**
+     *
+     */
+    static {
+        GridTcpCommunicationMessageFactory.registerCustom(new GridTcpCommunicationMessageProducer() {
+            @Override
+            public GridTcpCommunicationMessageAdapter create(byte type) {
+                return new GridTestMessage();
+            }
+        }, GridTestMessage.DIRECT_TYPE);
+    }
+
+    /**
+     * Disable SPI auto-start.
+     */
+    public GridTcpCommunicationSpiRecoveryAckSelfTest() {
+        super(false);
+    }
+
+    /** */
+    @SuppressWarnings({"deprecation"})
+    private class TestListener implements CommunicationListener<GridTcpCommunicationMessageAdapter> {
+        /** */
+        private ConcurrentHashSet<Long> msgIds = new ConcurrentHashSet<>();
+
+        /** */
+        private AtomicInteger rcvCnt = new AtomicInteger();
+
+        /** {@inheritDoc} */
+        @Override public void onMessage(UUID nodeId, GridTcpCommunicationMessageAdapter msg, IgniteRunnable msgC) {
+            info("Test listener received message: " + msg);
+
+            assertTrue("Unexpected message: " + msg, msg instanceof GridTestMessage);
+
+            GridTestMessage msg0 = (GridTestMessage)msg;
+
+            assertTrue("Duplicated message received: " + msg0, msgIds.add(msg0.getMsgId()));
+
+            rcvCnt.incrementAndGet();
+
+            msgC.run();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onDisconnected(UUID nodeId) {
+            // No-op.
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAckOnIdle() throws Exception {
+        checkAck(10, 2000, 9);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAckOnCount() throws Exception {
+        checkAck(10, 60_000, 10);
+    }
+
+    /**
+     * @param ackCnt Recovery acknowledgement count.
+     * @param idleTimeout Idle connection timeout.
+     * @param msgPerIter Messages per iteration.
+     * @throws Exception If failed.
+     */
+    private void checkAck(int ackCnt, int idleTimeout, int msgPerIter) throws Exception {
+        createSpis(ackCnt, idleTimeout, TcpCommunicationSpi.DFLT_MSG_QUEUE_LIMIT);
+
+        try {
+            TcpCommunicationSpi spi0 = spis.get(0);
+            TcpCommunicationSpi spi1 = spis.get(1);
+
+            ClusterNode node0 = nodes.get(0);
+            ClusterNode node1 = nodes.get(1);
+
+            int msgId = 0;
+
+            int expMsgs = 0;
+
+            for (int i = 0; i < 5; i++) {
+                info("Iteration: " + i);
+
+                for (int j = 0; j < msgPerIter; j++) {
+                    spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0));
+
+                    spi1.sendMessage(node0, new GridTestMessage(node1.id(), ++msgId, 0));
+                }
+
+                expMsgs += msgPerIter;
+
+                for (TcpCommunicationSpi spi : spis) {
+                    GridNioServer srv = U.field(spi, "nioSrvr");
+
+                    Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
+
+                    assertFalse(sessions.isEmpty());
+
+                    boolean found = false;
+
+                    for (GridNioSession ses : sessions) {
+                        final GridNioRecoveryDescriptor recoveryDesc = ses.recoveryDescriptor();
+
+                        if (recoveryDesc != null) {
+                            found = true;
+
+                            GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                                @Override public boolean apply() {
+                                    return recoveryDesc.messagesFutures().isEmpty();
+                                }
+                            }, 10_000);
+
+                            assertEquals("Unexpected messages: " + recoveryDesc.messagesFutures(), 0,
+                                recoveryDesc.messagesFutures().size());
+
+                            break;
+                        }
+                    }
+
+                    assertTrue(found);
+                }
+
+                final int expMsgs0 = expMsgs;
+
+                for (TcpCommunicationSpi spi : spis) {
+                    final TestListener lsnr = (TestListener)spi.getListener();
+
+                    GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                        @Override
+                        public boolean apply() {
+                            return lsnr.rcvCnt.get() >= expMsgs0;
+                        }
+                    }, 5000);
+
+                    assertEquals(expMsgs, lsnr.rcvCnt.get());
+                }
+            }
+        }
+        finally {
+            stopSpis();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testQueueOverflow() throws Exception {
+        for (int i = 0; i < 3; i++) {
+            try {
+                startSpis(5, 60_000, 10);
+
+                checkOverflow();
+
+                break;
+            }
+            catch (GridException e) {
+                if (e.hasCause(BindException.class)) {
+                    if (i < 2) {
+                        info("Got exception caused by BindException, will retry after delay: " + e);
+
+                        stopSpis();
+
+                        U.sleep(10_000);
+                    }
+                    else
+                        throw e;
+                }
+                else
+                    throw e;
+            }
+            finally {
+                stopSpis();
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkOverflow() throws Exception {
+        TcpCommunicationSpi spi0 = spis.get(0);
+        TcpCommunicationSpi spi1 = spis.get(1);
+
+        ClusterNode node0 = nodes.get(0);
+        ClusterNode node1 = nodes.get(1);
+
+        final GridNioServer srv1 = U.field(spi1, "nioSrvr");
+
+        int msgId = 0;
+
+        // Send message to establish connection.
+        spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0));
+
+        // Prevent node1 from send
+        GridTestUtils.setFieldValue(srv1, "skipWrite", true);
+
+        final GridNioSession ses0 = communicationSession(spi0);
+
+        for (int i = 0; i < 150; i++)
+            spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0));
+
+        // Wait when session is closed because of queue overflow.
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return ses0.closeTime() != 0;
+            }
+        }, 5000);
+
+        assertTrue("Failed to wait for session close", ses0.closeTime() != 0);
+
+        GridTestUtils.setFieldValue(srv1, "skipWrite", false);
+
+        for (int i = 0; i < 100; i++)
+            spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0));
+
+        final int expMsgs = 251;
+
+        final TestListener lsnr = (TestListener)spi1.getListener();
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override
+            public boolean apply() {
+                return lsnr.rcvCnt.get() >= expMsgs;
+            }
+        }, 5000);
+
+        assertEquals(expMsgs, lsnr.rcvCnt.get());
+    }
+
+    /**
+     * @param spi SPI.
+     * @return Session.
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("unchecked")
+    private GridNioSession communicationSession(TcpCommunicationSpi spi) throws Exception {
+        final GridNioServer srv = U.field(spi, "nioSrvr");
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override
+            public boolean apply() {
+                Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
+
+                return !sessions.isEmpty();
+            }
+        }, 5000);
+
+        Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
+
+        assertEquals(1, sessions.size());
+
+        return sessions.iterator().next();
+    }
+
+    /**
+     * @param ackCnt Recovery acknowledgement count.
+     * @param idleTimeout Idle connection timeout.
+     * @param queueLimit Message queue limit.
+     * @return SPI instance.
+     */
+    protected TcpCommunicationSpi getSpi(int ackCnt, int idleTimeout, int queueLimit) {
+        TcpCommunicationSpi spi = new TcpCommunicationSpi();
+
+        spi.setSharedMemoryPort(-1);
+        spi.setLocalPort(GridTestUtils.getNextCommPort(getClass()));
+        spi.setIdleConnectionTimeout(idleTimeout);
+        spi.setTcpNoDelay(true);
+        spi.setAckSendThreshold(ackCnt);
+        spi.setMessageQueueLimit(queueLimit);
+
+        return spi;
+    }
+
+    /**
+     * @param ackCnt Recovery acknowledgement count.
+     * @param idleTimeout Idle connection timeout.
+     * @param queueLimit Message queue limit.
+     * @throws Exception If failed.
+     */
+    private void startSpis(int ackCnt, int idleTimeout, int queueLimit) throws Exception {
+        spis.clear();
+        nodes.clear();
+        spiRsrcs.clear();
+
+        Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>();
+
+        for (int i = 0; i < SPI_CNT; i++) {
+            TcpCommunicationSpi spi = getSpi(ackCnt, idleTimeout, queueLimit);
+
+            GridTestUtils.setFieldValue(spi, "gridName", "grid-" + i);
+
+            GridTestResources rsrcs = new GridTestResources();
+
+            GridTestNode node = new GridTestNode(rsrcs.getNodeId());
+
+            GridSpiTestContext ctx = initSpiContext();
+
+            ctx.setLocalNode(node);
+
+            spiRsrcs.add(rsrcs);
+
+            rsrcs.inject(spi);
+
+            spi.setListener(new TestListener());
+
+            node.setAttributes(spi.getNodeAttributes());
+
+            nodes.add(node);
+
+            spi.spiStart(getTestGridName() + (i + 1));
+
+            spis.add(spi);
+
+            spi.onContextInitialized(ctx);
+
+            ctxs.put(node, ctx);
+        }
+
+        // For each context set remote nodes.
+        for (Map.Entry<ClusterNode, GridSpiTestContext> e : ctxs.entrySet()) {
+            for (ClusterNode n : nodes) {
+                if (!n.equals(e.getKey()))
+                    e.getValue().remoteNodes().add(n);
+            }
+        }
+    }
+
+    /**
+     * @param ackCnt Recovery acknowledgement count.
+     * @param idleTimeout Idle connection timeout.
+     * @param queueLimit Message queue limit.
+     * @throws Exception If failed.
+     */
+    private void createSpis(int ackCnt, int idleTimeout, int queueLimit) throws Exception {
+        for (int i = 0; i < 3; i++) {
+            try {
+                startSpis(ackCnt, idleTimeout, queueLimit);
+
+                break;
+            }
+            catch (GridException e) {
+                if (e.hasCause(BindException.class)) {
+                    if (i < 2) {
+                        info("Failed to start SPIs because of BindException, will retry after delay.");
+
+                        stopSpis();
+
+                        U.sleep(10_000);
+                    }
+                    else
+                        throw e;
+                }
+                else
+                    throw e;
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void stopSpis() throws Exception {
+        for (CommunicationSpi<GridTcpCommunicationMessageAdapter> spi : spis) {
+            spi.onContextDestroyed();
+
+            spi.setListener(null);
+
+            spi.spiStop();
+        }
+
+        for (GridTestResources rsrcs : spiRsrcs)
+            rsrcs.stopThreads();
+
+        spis.clear();
+        nodes.clear();
+        spiRsrcs.clear();
+    }
+}


Mime
View raw message