ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-4003 Changes in GridNioServer for async connect
Date Thu, 24 Aug 2017 12:17:14 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-4003-1 [created] eaf16c99e


ignite-4003 Changes in GridNioServer for async connect


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/eaf16c99
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/eaf16c99
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/eaf16c99

Branch: refs/heads/ignite-4003-1
Commit: eaf16c99ef88635e2b950ef5964d9a0bd1299dd1
Parents: e1bf8d7
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Aug 24 15:17:04 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Aug 24 15:17:04 2017 +0300

----------------------------------------------------------------------
 .../GridClientConnectionManagerAdapter.java     |   1 -
 .../connection/GridClientNioTcpConnection.java  |   2 +-
 .../internal/util/GridSpinReadWriteLock.java    |   2 +-
 .../internal/util/nio/GridNioKeyAttachment.java |  32 +++
 .../util/nio/GridNioRecoveryDescriptor.java     |   3 +-
 .../ignite/internal/util/nio/GridNioServer.java | 248 +++++++++++++++----
 .../util/nio/GridSelectorNioSessionImpl.java    |  28 +--
 .../internal/util/nio/ssl/GridNioSslFilter.java |  12 +-
 .../communication/tcp/TcpCommunicationSpi.java  |   4 +-
 .../IgniteCacheMessageWriteTimeoutTest.java     |   4 +-
 .../internal/util/nio/GridNioSelfTest.java      |   2 +-
 .../spi/GridTcpSpiForwardingSelfTest.java       |   1 +
 .../GridAbstractCommunicationSelfTest.java      |  27 +-
 ...mmunicationSpiConcurrentConnectSelfTest.java |  28 ++-
 ...dTcpCommunicationSpiRecoveryAckSelfTest.java |  39 ++-
 ...GridTcpCommunicationSpiRecoverySelfTest.java |  47 +++-
 ...CommunicationRecoveryAckClosureSelfTest.java |  36 ++-
 .../tcp/TcpCommunicationSpiDropNodesTest.java   |   3 +-
 .../HadoopExternalCommunication.java            |   5 +-
 19 files changed, 427 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf16c99/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
index e325897..aa06322 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
@@ -183,7 +183,6 @@ public abstract class GridClientConnectionManagerAdapter implements GridClientCo
                     GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog);
 
                     sslFilter.directMode(false);
-                    sslFilter.clientMode(true);
 
                     filters = new GridNioFilter[]{codecFilter, sslFilter};
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf16c99/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
index 18ce6e4..f72a009 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
@@ -235,7 +235,7 @@ public class GridClientNioTcpConnection extends GridClientConnection {
                 meta.put(GridNioSslFilter.HANDSHAKE_FUT_META_KEY, sslHandshakeFut);
             }
 
-            ses = (GridNioSession)srv.createSession(ch, meta).get();
+            ses = (GridNioSession)srv.createSession(ch, meta, false, null).get();
 
             if (sslHandshakeFut != null)
                 sslHandshakeFut.get();

http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf16c99/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java
index 4f23979..8fef887 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java
@@ -70,7 +70,7 @@ public class GridSpinReadWriteLock {
     private int writeLockEntryCnt;
 
     /**
-     * Acquires write lock.
+     * Acquires read lock.
      */
     @SuppressWarnings("BusyWait")
     public void readLock() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf16c99/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioKeyAttachment.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioKeyAttachment.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioKeyAttachment.java
new file mode 100644
index 0000000..c3f6bb2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioKeyAttachment.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.nio;
+
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+interface GridNioKeyAttachment {
+    boolean hasSession();
+
+    /**
+     * @return Session if it was created.
+     */
+    @Nullable GridSelectorNioSessionImpl session();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf16c99/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
index 7496728..af7b757 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
@@ -31,7 +31,6 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Recovery information for single node.
  */
-@Deprecated // To be splitted into separate classes for in/out data when do not need maintain backward compatibility.
 public class GridNioRecoveryDescriptor {
     /** Number of acknowledged messages. */
     private long acked;
@@ -260,7 +259,7 @@ public class GridNioRecoveryDescriptor {
 
     /**
      * @param node Node.
-     * @return {@code True} if node is not null and has the same order as initial remtoe node.
+     * @return {@code True} if node is not null and has the same order as initial remote node.
      */
     public boolean nodeAlive(@Nullable ClusterNode node) {
         return node != null && node.order() == this.node.order();

http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf16c99/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index a04a735..0dd7dd6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -113,6 +113,12 @@ public class GridNioServer<T> {
     /** SSL write buf limit. */
     private static final int WRITE_BUF_LIMIT = GridNioSessionMetaKey.nextUniqueKey();
 
+    /** Session future meta key. */
+    public static final int RECOVERY_DESC_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+
+    /** Selection key meta key. */
+    private static final int WORKER_IDX_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+
     /** */
     private static final boolean DISABLE_KEYSET_OPTIMIZATION =
         IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_NO_SELECTOR_OPTS);
@@ -465,7 +471,7 @@ public class GridNioServer<T> {
      * @return Future for operation.
      */
     public GridNioFuture<Boolean> close(GridNioSession ses) {
-        assert ses instanceof GridSelectorNioSessionImpl;
+        assert ses instanceof GridSelectorNioSessionImpl : ses;
 
         GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
 
@@ -811,17 +817,32 @@ public class GridNioServer<T> {
      *
      * @param ch Channel to register within the server and create session for.
      * @param meta Optional meta for new session.
+     * @param async Async connection.
+     * @param lsnr Listener that should be invoked in NIO thread.
      * @return Future to get session.
      */
-    public GridNioFuture<GridNioSession> createSession(final SocketChannel ch,
-        @Nullable Map<Integer, ?> meta) {
+    public GridNioFuture<GridNioSession> createSession(
+        final SocketChannel ch,
+        @Nullable Map<Integer, Object> meta,
+        boolean async,
+        @Nullable IgniteInClosure<? super IgniteInternalFuture<GridNioSession>> lsnr
+    ) {
         try {
             if (!closed) {
                 ch.configureBlocking(false);
 
                 NioOperationFuture<GridNioSession> req = new NioOperationFuture<>(ch, false, meta);
 
-                offerBalanced(req);
+                if (async) {
+                    assert meta != null;
+
+                    req.op = NioOperation.CONNECT;
+                }
+
+                if (lsnr != null)
+                    req.listen(lsnr);
+
+                offerBalanced(req, meta);
 
                 return req;
             }
@@ -835,6 +856,29 @@ public class GridNioServer<T> {
     }
 
     /**
+     * @param ch Channel.
+     * @param meta Session meta.
+     */
+    public GridNioFuture<GridNioSession> cancelConnect(final SocketChannel ch, Map<Integer, ?> meta) {
+        if (!closed) {
+            NioOperationFuture<GridNioSession> req = new NioOperationFuture<>(ch, false, meta);
+
+            req.op = NioOperation.CANCEL_CONNECT;
+
+            Integer idx = (Integer)meta.get(WORKER_IDX_META_KEY);
+
+            assert idx != null : meta;
+
+            clientWorkers.get(idx).offer(req);
+
+            return req;
+        }
+        else
+            return new GridNioFinishedFuture<>(
+                new IgniteCheckedException("Failed to cancel connection, server is stopped."));
+    }
+
+    /**
      * Gets configurable write timeout for this session. If not set, default value is {@link #DFLT_SES_WRITE_TIMEOUT}.
      *
      * @return Write timeout in milliseconds.
@@ -920,9 +964,10 @@ public class GridNioServer<T> {
 
     /**
      * @param req Request to balance.
+     * @param meta Session metadata.
      */
-    private synchronized void offerBalanced(NioOperationFuture req) {
-        assert req.operation() == NioOperation.REGISTER : req;
+    private synchronized void offerBalanced(NioOperationFuture req, @Nullable Map<Integer, Object> meta) {
+        assert req.operation() == NioOperation.REGISTER || req.operation() == NioOperation.CONNECT : req;
         assert req.socketChannel() != null : req;
 
         int workers = clientWorkers.size();
@@ -960,6 +1005,9 @@ public class GridNioServer<T> {
         else
             balanceIdx = 0;
 
+        if (meta != null)
+            meta.put(WORKER_IDX_META_KEY, balanceIdx);
+
         clientWorkers.get(balanceIdx).offer(req);
     }
 
@@ -1788,6 +1836,38 @@ public class GridNioServer<T> {
 
                     while ((req0 = changeReqs.poll()) != null) {
                         switch (req0.operation()) {
+                            case CONNECT: {
+                                NioOperationFuture fut = (NioOperationFuture)req0;
+
+                                SocketChannel ch = fut.socketChannel();
+
+                                try {
+                                    ch.register(selector, SelectionKey.OP_CONNECT, fut);
+                                }
+                                catch (IOException e) {
+                                    fut.onDone(new IgniteCheckedException("Failed to register channel on selector", e));
+                                }
+
+                                break;
+                            }
+
+                            case CANCEL_CONNECT: {
+                                NioOperationFuture req = (NioOperationFuture)req0;
+
+                                SocketChannel ch = req.socketChannel();
+
+                                SelectionKey key = ch.keyFor(selector);
+
+                                if (key != null)
+                                    key.cancel();
+
+                                U.closeQuiet(ch);
+
+                                req.onDone();
+
+                                break;
+                            }
+
                             case REGISTER: {
                                 register((NioOperationFuture)req0);
 
@@ -1998,8 +2078,12 @@ public class GridNioServer<T> {
                         log.debug("Closing all connected client sockets.");
 
                     // Close all channels registered with selector.
-                    for (SelectionKey key : selector.keys())
-                        close((GridSelectorNioSessionImpl)key.attachment(), null);
+                    for (SelectionKey key : selector.keys()) {
+                        GridNioKeyAttachment attach = (GridNioKeyAttachment)key.attachment();
+
+                        if (attach != null && attach.hasSession())
+                            close(attach.session(), null);
+                    }
 
                     if (log.isDebugEnabled())
                         log.debug("Closing NIO selector.");
@@ -2173,11 +2257,17 @@ public class GridNioServer<T> {
                 if (!key.isValid())
                     continue;
 
-                GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
+                GridNioKeyAttachment attach = (GridNioKeyAttachment)key.attachment();
 
-                assert ses != null;
+                assert attach != null;
 
                 try {
+                    if (!attach.hasSession() && key.isConnectable()) {
+                        processConnect(key);
+
+                        continue;
+                    }
+
                     if (key.isReadable())
                         processRead(key);
 
@@ -2196,7 +2286,12 @@ public class GridNioServer<T> {
                         // No-op.
                     }
 
-                    U.warn(log, "Failed to process selector key (will close): " + ses, e);
+                    GridSelectorNioSessionImpl ses = attach.session();
+
+                    if (!closed)
+                        U.error(log, "Failed to process selector key [ses=" + ses + ']', e);
+                    else if (log.isDebugEnabled())
+                        log.debug("Failed to process selector key [ses=" + ses + ", err=" + e + ']');
 
                     close(ses, new GridNioException(e));
                 }
@@ -2225,11 +2320,17 @@ public class GridNioServer<T> {
                 if (!key.isValid())
                     continue;
 
-                GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
+                GridNioKeyAttachment attach = (GridNioKeyAttachment)key.attachment();
 
-                assert ses != null;
+                assert attach != null;
 
                 try {
+                    if (!attach.hasSession() && key.isConnectable()) {
+                        processConnect(key);
+
+                        continue;
+                    }
+
                     if (key.isReadable())
                         processRead(key);
 
@@ -2248,10 +2349,12 @@ public class GridNioServer<T> {
                         // No-op.
                     }
 
-                    if (!closed)
-                        U.warn(log, "Failed to process selector key (will close): " + ses, e);
+                    GridSelectorNioSessionImpl ses = attach.session();
 
-                    close(ses, new GridNioException(e));
+                    if (!closed)
+                        U.error(log, "Failed to process selector key [ses=" + ses + ']', e);
+                    else if (log.isDebugEnabled())
+                        log.debug("Failed to process selector key [ses=" + ses + ", err=" + e + ']');
                 }
             }
         }
@@ -2265,7 +2368,12 @@ public class GridNioServer<T> {
             long now = U.currentTimeMillis();
 
             for (SelectionKey key : keys) {
-                GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
+                GridNioKeyAttachment attach = (GridNioKeyAttachment)key.attachment();
+
+                if (attach == null || !attach.hasSession())
+                    continue;
+
+                GridSelectorNioSessionImpl ses = attach.session();
 
                 try {
                     long writeTimeout0 = writeTimeout;
@@ -2303,12 +2411,12 @@ public class GridNioServer<T> {
         /**
          * Registers given socket channel to the selector, creates a session and notifies the listener.
          *
-         * @param req Registration request.
+         * @param fut Registration future.
          */
-        private void register(NioOperationFuture<GridNioSession> req) {
-            assert req != null;
+        private void register(NioOperationFuture<GridNioSession> fut) {
+            assert fut != null;
 
-            SocketChannel sockCh = req.socketChannel();
+            SocketChannel sockCh = fut.socketChannel();
 
             assert sockCh != null;
 
@@ -2334,24 +2442,53 @@ public class GridNioServer<T> {
                     filterChain,
                     (InetSocketAddress)sockCh.getLocalAddress(),
                     (InetSocketAddress)sockCh.getRemoteAddress(),
-                    req.accepted(),
+                    fut.accepted(),
                     sndQueueLimit,
                     writeBuf,
                     readBuf);
 
-                Map<Integer, ?> meta = req.meta();
+                Map<Integer, ?> meta = fut.meta();
 
                 if (meta != null) {
                     for (Entry<Integer, ?> e : meta.entrySet())
                         ses.addMeta(e.getKey(), e.getValue());
+
+                    if (!ses.accepted()) {
+                        GridNioRecoveryDescriptor desc =
+                            (GridNioRecoveryDescriptor)meta.get(RECOVERY_DESC_META_KEY);
+
+                        if (desc != null) {
+                            ses.outRecoveryDescriptor(desc);
+
+                            if (!desc.pairedConnections())
+                                ses.inRecoveryDescriptor(desc);
+                        }
+                    }
                 }
 
-                SelectionKey key = sockCh.register(selector, SelectionKey.OP_READ, ses);
+                SelectionKey key;
 
-                ses.key(key);
+                if (!sockCh.isRegistered()) {
+                    assert fut.op == NioOperation.REGISTER : fut.op;
+
+                    key = sockCh.register(selector, SelectionKey.OP_READ, ses);
+
+                    ses.key(key);
 
-                if (!ses.accepted())
                     resend(ses);
+                }
+                else {
+                    assert fut.op == NioOperation.CONNECT : fut.op;
+
+                    key = sockCh.keyFor(selector);
+
+                    key.attach(ses);
+
+                    key.interestOps(key.interestOps() & (~SelectionKey.OP_CONNECT));
+                    key.interestOps(key.interestOps() | SelectionKey.OP_READ);
+
+                    ses.key(key);
+                }
 
                 sessions.add(ses);
                 workerSessions.add(ses);
@@ -2359,12 +2496,12 @@ public class GridNioServer<T> {
                 try {
                     filterChain.onSessionOpened(ses);
 
-                    req.onDone(ses);
+                    fut.onDone(ses);
                 }
                 catch (IgniteCheckedException e) {
                     close(ses, e);
 
-                    req.onDone(e);
+                    fut.onDone(e);
                 }
 
                 if (closed)
@@ -2486,6 +2623,31 @@ public class GridNioServer<T> {
         }
 
         /**
+         * @param key Key.
+         * @throws IOException If failed.
+         */
+        @SuppressWarnings("unchecked")
+        private void processConnect(SelectionKey key) throws IOException {
+            SocketChannel ch = (SocketChannel)key.channel();
+
+            NioOperationFuture<GridNioSession> sesFut = (NioOperationFuture<GridNioSession>)key.attachment();
+
+            assert sesFut != null;
+
+            try {
+                if (ch.finishConnect())
+                    register(sesFut);
+            }
+            catch (IOException e) {
+                U.closeQuiet(ch);
+
+                sesFut.onDone(new GridNioException("Failed to connect to node", e));
+
+                throw e;
+            }
+        }
+
+        /**
          * Processes read-available event on the key.
          *
          * @param key Key that is ready to be read.
@@ -2690,7 +2852,7 @@ public class GridNioServer<T> {
                     if (log.isDebugEnabled())
                         log.debug("Accepted new client connection: " + sockCh.socket().getRemoteSocketAddress());
 
-                    addRegistrationReq(sockCh);
+                    addRegistrationRequest(sockCh);
                 }
             }
         }
@@ -2701,15 +2863,21 @@ public class GridNioServer<T> {
          *
          * @param sockCh Socket channel to be registered on one of the selectors.
          */
-        private void addRegistrationReq(SocketChannel sockCh) {
-            offerBalanced(new NioOperationFuture(sockCh));
+        private void addRegistrationRequest(SocketChannel sockCh) {
+            offerBalanced(new NioOperationFuture<>(sockCh, true, null), null);
         }
     }
 
     /**
      * Asynchronous operation that may be requested on selector.
      */
-    enum NioOperation {
+    private enum NioOperation {
+        /** Register connect key selection. */
+        CONNECT,
+
+        /** Cancel connect. */
+        CANCEL_CONNECT,
+
         /** Register read key selection. */
         REGISTER,
 
@@ -2914,7 +3082,7 @@ public class GridNioServer<T> {
      * Class for requesting write and session close operations.
      */
     private static class NioOperationFuture<R> extends GridNioFutureImpl<R> implements SessionWriteRequest,
-        SessionChangeRequest {
+        SessionChangeRequest, GridNioKeyAttachment {
         /** Socket channel in register request. */
         @GridToStringExclude
         private SocketChannel sockCh;
@@ -2942,15 +3110,6 @@ public class GridNioServer<T> {
         private boolean skipRecovery;
 
         /**
-         * Creates registration request for a given socket channel.
-         *
-         * @param sockCh Socket channel to register on selector.
-         */
-        NioOperationFuture(SocketChannel sockCh) {
-            this(sockCh, true, null);
-        }
-
-        /**
          * @param sockCh Socket channel.
          * @param accepted {@code True} if socket has been accepted.
          * @param meta Optional meta.
@@ -3038,6 +3197,11 @@ public class GridNioServer<T> {
         }
 
         /** {@inheritDoc} */
+        @Override public boolean hasSession() {
+            return ses != null;
+        }
+
+        /** {@inheritDoc} */
         public NioOperation operation() {
             return op;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf16c99/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index 66f9176..71a3b8d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -37,7 +37,7 @@ import org.jsr166.ConcurrentLinkedDeque8;
  * Note that this implementation requires non-null values for local and remote
  * socket addresses.
  */
-class GridSelectorNioSessionImpl extends GridNioSessionImpl {
+class GridSelectorNioSessionImpl extends GridNioSessionImpl implements GridNioKeyAttachment {
     /** Pending write requests. */
     private final ConcurrentLinkedDeque8<SessionWriteRequest> queue = new ConcurrentLinkedDeque8<>();
 
@@ -129,6 +129,16 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean hasSession() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public GridSelectorNioSessionImpl session() {
+        return this;
+    }
+
     /**
      * @return Worker.
      */
@@ -385,22 +395,6 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
         return inRecovery;
     }
 
-    /** {@inheritDoc} */
-    @Override public <T> T addMeta(int key, @Nullable T val) {
-        if (!accepted() && val instanceof GridNioRecoveryDescriptor) {
-            outRecovery = (GridNioRecoveryDescriptor)val;
-
-            if (!outRecovery.pairedConnections())
-                inRecovery = outRecovery;
-
-            outRecovery.onConnected();
-
-            return null;
-        }
-        else
-            return super.addMeta(key, val);
-    }
-
     /**
      *
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf16c99/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
index b4bd34a..f8a0dce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
@@ -69,9 +69,6 @@ public class GridNioSslFilter extends GridNioFilterAdapter {
     /** Allocate direct buffer or heap buffer. */
     private boolean directBuf;
 
-    /** Whether SSLEngine should use client mode. */
-    private boolean clientMode;
-
     /** Whether direct mode is used. */
     private boolean directMode;
 
@@ -93,13 +90,6 @@ public class GridNioSslFilter extends GridNioFilterAdapter {
     }
 
     /**
-     * @param clientMode Flag indicating whether SSLEngine should use client mode..
-     */
-    public void clientMode(boolean clientMode) {
-        this.clientMode = clientMode;
-    }
-
-    /**
      *
      * @param directMode Flag indicating whether direct mode is used.
      */
@@ -164,6 +154,8 @@ public class GridNioSslFilter extends GridNioFilterAdapter {
         if (sslMeta == null) {
             engine = sslCtx.createSSLEngine();
 
+            boolean clientMode = !ses.accepted();
+
             engine.setUseClientMode(clientMode);
 
             if (!clientMode) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf16c99/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index bab9cfa..0e712ae 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -3162,10 +3162,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                         if (recoveryDesc != null) {
                             recoveryDesc.onHandshake(rcvCnt);
 
-                            meta.put(-1, recoveryDesc);
+                            meta.put(GridNioServer.RECOVERY_DESC_META_KEY, recoveryDesc);
                         }
 
-                        GridNioSession ses = nioSrvr.createSession(ch, meta).get();
+                        GridNioSession ses = nioSrvr.createSession(ch, meta, false, null).get();
 
                         client = new GridTcpNioCommunicationClient(connIdx, ses, log);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf16c99/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
index b3b5d1a..3ba319b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
@@ -50,8 +50,8 @@ public class IgniteCacheMessageWriteTimeoutTest extends GridCommonAbstractTest {
         // Try provoke connection close on socket writeTimeout.
         commSpi.setSharedMemoryPort(-1);
         commSpi.setMessageQueueLimit(10);
-        commSpi.setSocketReceiveBuffer(40);
-        commSpi.setSocketSendBuffer(40);
+        commSpi.setSocketReceiveBuffer(64);
+        commSpi.setSocketSendBuffer(64);
         commSpi.setSocketWriteTimeout(100);
         commSpi.setUnacknowledgedMessagesBufferSize(1000);
         commSpi.setConnectTimeout(10_000);

http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf16c99/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
index 4dbb7ce..e623467 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
@@ -678,7 +678,7 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
         try {
             SocketChannel ch = SocketChannel.open(new InetSocketAddress(U.getLocalHost(), srvr2.port()));
 
-            GridNioFuture<GridNioSession> fut = srvr1.createSession(ch, null);
+            GridNioFuture<GridNioSession> fut = srvr1.createSession(ch, null, false, null);
 
             ses = fut.get();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf16c99/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
index 1e25003..5d8e316 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
@@ -30,6 +30,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.AddressResolver;
 import org.apache.ignite.configuration.BasicAddressResolver;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.nio.GridCommunicationClient;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.lang.IgniteCallable;

http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf16c99/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
index 88276c2..e51dac8 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
@@ -30,6 +30,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.typedef.CO;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -39,6 +40,7 @@ import org.apache.ignite.spi.IgniteSpiAdapter;
 import org.apache.ignite.testframework.GridSpiTestContext;
 import org.apache.ignite.testframework.GridTestNode;
 import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.GridTestKernalContext;
 import org.apache.ignite.testframework.junits.IgniteMock;
 import org.apache.ignite.testframework.junits.IgniteTestResources;
 import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest;
@@ -70,6 +72,9 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
     private static final Object mux = new Object();
 
     /** */
+    private static GridTimeoutProcessor timeoutProcessor;
+
+    /** */
     protected boolean useSsl = false;
 
     /**
@@ -289,6 +294,12 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
 
         Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>();
 
+        timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log));
+
+        timeoutProcessor.start();
+
+        timeoutProcessor.onKernalStart(true);
+
         for (int i = 0; i < getSpiCount(); i++) {
             CommunicationSpi<Message> spi = getSpi(i);
 
@@ -298,18 +309,20 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
 
             GridTestNode node = new GridTestNode(rsrcs.getNodeId());
 
-            node.order(i);
-
             GridSpiTestContext ctx = initSpiContext();
 
             ctx.setLocalNode(node);
 
+            ctx.timeoutProcessor(timeoutProcessor);
+
             info(">>> Initialized context: nodeId=" + ctx.localNode().id());
 
             spiRsrcs.add(rsrcs);
 
             rsrcs.inject(spi);
 
+            GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i);
+
             if (useSsl) {
                 IgniteMock ignite = GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "ignite");
 
@@ -324,6 +337,8 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
             node.setAttributes(spi.getNodeAttributes());
             node.setAttribute(ATTR_MACS, F.concat(U.allLocalMACs(), ", "));
 
+            node.order(i + 1);
+
             nodes.add(node);
 
             spi.spiStart(getTestIgniteInstanceName() + (i + 1));
@@ -346,6 +361,14 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
 
     /** {@inheritDoc} */
     @Override protected void afterTestsStopped() throws Exception {
+        if (timeoutProcessor != null) {
+            timeoutProcessor.onKernalStop(true);
+
+            timeoutProcessor.stop(true);
+
+            timeoutProcessor = null;
+        }
+
         for (CommunicationSpi<Message> spi : spis.values()) {
             spi.onContextDestroyed();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf16c99/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
index 78bf869..ce96c55 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
@@ -36,7 +36,9 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.nio.GridCommunicationClient;
 import org.apache.ignite.internal.util.nio.GridNioServer;
@@ -51,6 +53,7 @@ import org.apache.ignite.spi.communication.GridTestMessage;
 import org.apache.ignite.testframework.GridSpiTestContext;
 import org.apache.ignite.testframework.GridTestNode;
 import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.GridTestKernalContext;
 import org.apache.ignite.testframework.junits.IgniteMock;
 import org.apache.ignite.testframework.junits.IgniteTestResources;
 import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest;
@@ -79,6 +82,9 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
     protected static final List<ClusterNode> nodes = new ArrayList<>();
 
     /** */
+    private static GridTimeoutProcessor timeoutProcessor;
+
+    /** */
     private static int port = 60_000;
 
     /** Use ssl. */
@@ -407,27 +413,37 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
 
         Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>();
 
+        timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log));
+
+        timeoutProcessor.start();
+
+        timeoutProcessor.onKernalStart(true);
+
         for (int i = 0; i < SPI_CNT; i++) {
             CommunicationSpi<Message> spi = createSpi();
 
-            GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i);
-
             IgniteTestResources rsrcs = new IgniteTestResources();
 
             GridTestNode node = new GridTestNode(rsrcs.getNodeId());
 
+            node.setAttribute(IgniteNodeAttributes.ATTR_CLIENT_MODE, false);
+
             node.order(i + 1);
 
             GridSpiTestContext ctx = initSpiContext();
 
             ctx.setLocalNode(node);
 
+            ctx.timeoutProcessor(timeoutProcessor);
+
             info(">>> Initialized context: nodeId=" + ctx.localNode().id());
 
             spiRsrcs.add(rsrcs);
 
             rsrcs.inject(spi);
 
+            GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i);
+
             if (useSsl) {
                 IgniteMock ignite = GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "ignite");
 
@@ -494,6 +510,14 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
      * @throws Exception If failed.
      */
     private void stopSpis() throws Exception {
+        if (timeoutProcessor != null) {
+            timeoutProcessor.onKernalStop(true);
+
+            timeoutProcessor.stop(true);
+
+            timeoutProcessor = null;
+        }
+
         for (CommunicationSpi<Message> spi : spis) {
             spi.onContextDestroyed();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf16c99/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
index feaae11..1467c29 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
 import org.apache.ignite.internal.util.nio.GridNioServer;
@@ -44,6 +45,7 @@ import org.apache.ignite.spi.communication.GridTestMessage;
 import org.apache.ignite.testframework.GridSpiTestContext;
 import org.apache.ignite.testframework.GridTestNode;
 import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.GridTestKernalContext;
 import org.apache.ignite.testframework.junits.IgniteTestResources;
 import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest;
 import org.apache.ignite.testframework.junits.spi.GridSpiTest;
@@ -64,6 +66,9 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
     protected static final List<ClusterNode> nodes = new ArrayList<>();
 
     /** */
+    private static GridTimeoutProcessor timeoutProcessor;
+
+    /** */
     private static final int SPI_CNT = 2;
 
     /**
@@ -159,6 +164,8 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
                     spi1.sendMessage(node0, new GridTestMessage(node1.id(), ++msgId, 0));
                 }
 
+                U.sleep(500);
+
                 expMsgs += msgPerIter;
 
                 final long totAcked0 = totAcked;
@@ -166,9 +173,13 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
                 for (TcpCommunicationSpi spi : spis) {
                     GridNioServer srv = U.field(spi, "nioSrvr");
 
-                    Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
+                    final Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
 
-                    assertFalse(sessions.isEmpty());
+                    GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                        @Override public boolean apply() {
+                            return !sessions.isEmpty();
+                        }
+                    }, 5_000);
 
                     boolean found = false;
 
@@ -282,7 +293,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
 
         int sentMsgs = 1;
 
-        for (int i = 0; i < 150; i++) {
+        for (int i = 0; i < 1280; i++) {
             try {
                 spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0));
 
@@ -379,11 +390,15 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
 
         Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>();
 
+        timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log));
+
+        timeoutProcessor.start();
+
+        timeoutProcessor.onKernalStart(true);
+
         for (int i = 0; i < SPI_CNT; i++) {
             TcpCommunicationSpi spi = getSpi(ackCnt, idleTimeout, queueLimit);
 
-            GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i);
-
             IgniteTestResources rsrcs = new IgniteTestResources();
 
             GridTestNode node = new GridTestNode(rsrcs.getNodeId());
@@ -392,14 +407,20 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
 
             ctx.setLocalNode(node);
 
+            ctx.timeoutProcessor(timeoutProcessor);
+
             spiRsrcs.add(rsrcs);
 
             rsrcs.inject(spi);
 
+            GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i);
+
             spi.setListener(new TestListener());
 
             node.setAttributes(spi.getNodeAttributes());
 
+            node.order(i);
+
             nodes.add(node);
 
             spi.spiStart(getTestIgniteInstanceName() + (i + 1));
@@ -455,6 +476,14 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
      * @throws Exception If failed.
      */
     private void stopSpis() throws Exception {
+        if (timeoutProcessor != null) {
+            timeoutProcessor.onKernalStop(true);
+
+            timeoutProcessor.stop(true);
+
+            timeoutProcessor = null;
+        }
+
         for (CommunicationSpi<Message> spi : spis) {
             spi.onContextDestroyed();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf16c99/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
index 2a043ee..c722577 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
@@ -32,6 +32,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.nio.GridNioServer;
 import org.apache.ignite.internal.util.nio.GridNioSession;
@@ -47,6 +48,7 @@ import org.apache.ignite.spi.communication.GridTestMessage;
 import org.apache.ignite.testframework.GridSpiTestContext;
 import org.apache.ignite.testframework.GridTestNode;
 import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.GridTestKernalContext;
 import org.apache.ignite.testframework.junits.IgniteMock;
 import org.apache.ignite.testframework.junits.IgniteTestResources;
 import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest;
@@ -80,6 +82,9 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
     /** Use ssl. */
     protected boolean useSsl;
 
+    /** */
+    private static GridTimeoutProcessor timeoutProcessor;
+
     /**
      *
      */
@@ -186,7 +191,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
      * @return Timeout.
      */
     protected long awaitForSocketWriteTimeout() {
-        return 8000;
+        return 20_000;
     }
 
     /**
@@ -298,6 +303,12 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
             // Send message to establish connection.
             spi0.sendMessage(node1, new GridTestMessage(node0.id(), msgId.incrementAndGet(), 0));
 
+            GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    return lsnr1.rcvCnt.get() >= 1;
+                }
+            }, 1000);
+
             final AtomicInteger sentCnt = new AtomicInteger(1);
 
             int errCnt = 0;
@@ -413,6 +424,12 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
             // Send message to establish connection.
             spi0.sendMessage(node1, new GridTestMessage(node0.id(), msgId.incrementAndGet(), 0));
 
+            GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    return lsnr1.rcvCnt.get() >= 1;
+                }
+            }, 1000);
+
             expCnt1.incrementAndGet();
 
             int errCnt = 0;
@@ -451,7 +468,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
                         ses1.resumeReads().get();
                     }
                     catch (IgniteCheckedException ignore) {
-                        // Can fail is ses1 was closed.
+                        // Can fail if ses1 was closed.
                     }
 
                     // Wait when session is closed, then try to open new connection from node1.
@@ -534,6 +551,12 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
             // Send message to establish connection.
             spi0.sendMessage(node1, new GridTestMessage(node0.id(), msgId.incrementAndGet(), 0));
 
+            GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    return lsnr1.rcvCnt.get() >= 1;
+                }
+            }, 1000);
+
             final AtomicInteger sentCnt = new AtomicInteger(1);
 
             int errCnt = 0;
@@ -686,11 +709,15 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
 
         Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>();
 
+        timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log));
+
+        timeoutProcessor.start();
+
+        timeoutProcessor.onKernalStart(true);
+
         for (int i = 0; i < SPI_CNT; i++) {
             TcpCommunicationSpi spi = getSpi(i);
 
-            GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i);
-
             IgniteTestResources rsrcs = new IgniteTestResources();
 
             GridTestNode node = new GridTestNode(rsrcs.getNodeId());
@@ -701,10 +728,14 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
 
             ctx.setLocalNode(node);
 
+            ctx.timeoutProcessor(timeoutProcessor);
+
             spiRsrcs.add(rsrcs);
 
             rsrcs.inject(spi);
 
+            GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i);
+
             if (useSsl) {
                 IgniteMock ignite = GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "ignite");
 
@@ -770,6 +801,14 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
      * @throws Exception If failed.
      */
     private void stopSpis() throws Exception {
+        if (timeoutProcessor != null) {
+            timeoutProcessor.onKernalStop(true);
+
+            timeoutProcessor.stop(true);
+
+            timeoutProcessor = null;
+        }
+
         for (CommunicationSpi<Message> spi : spis) {
             spi.onContextDestroyed();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf16c99/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
index 3f58055..d1689bd 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
@@ -29,6 +29,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
 import org.apache.ignite.internal.util.nio.GridNioServer;
@@ -47,6 +48,7 @@ import org.apache.ignite.spi.communication.GridTestMessage;
 import org.apache.ignite.testframework.GridSpiTestContext;
 import org.apache.ignite.testframework.GridTestNode;
 import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.GridTestKernalContext;
 import org.apache.ignite.testframework.junits.IgniteTestResources;
 import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest;
 import org.apache.ignite.testframework.junits.spi.GridSpiTest;
@@ -70,6 +72,9 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
     /** */
     private static final int SPI_CNT = 2;
 
+    /** */
+    private static GridTimeoutProcessor timeoutProcessor;
+
     /**
      *
      */
@@ -98,8 +103,6 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
 
         /** {@inheritDoc} */
         @Override public void onMessage(UUID nodeId, Message msg, IgniteRunnable msgC) {
-            info("Test listener received message: " + msg);
-
             assertTrue("Unexpected message: " + msg, msg instanceof GridTestMessage);
 
             GridTestMessage msg0 = (GridTestMessage)msg;
@@ -171,6 +174,17 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
                     spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackC);
 
                     spi1.sendMessage(node0, new GridTestMessage(node1.id(), ++msgId, 0), ackC);
+
+                    if (j == 0) {
+                        final TestListener lsnr0 = (TestListener)spi0.getListener();
+                        final TestListener lsnr1 = (TestListener)spi1.getListener();
+
+                        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                            @Override public boolean apply() {
+                                return lsnr0.rcvCnt.get() >= 1 && lsnr1.rcvCnt.get() >= 1;
+                            }
+                        }, 1000);
+                    }
                 }
 
                 expMsgs += msgPerIter;
@@ -415,6 +429,12 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
 
         Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>();
 
+        timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log));
+
+        timeoutProcessor.start();
+
+        timeoutProcessor.onKernalStart(true);
+
         for (int i = 0; i < SPI_CNT; i++) {
             TcpCommunicationSpi spi = getSpi(ackCnt, idleTimeout, queueLimit);
 
@@ -428,6 +448,8 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
 
             ctx.setLocalNode(node);
 
+            ctx.timeoutProcessor(timeoutProcessor);
+
             spiRsrcs.add(rsrcs);
 
             rsrcs.inject(spi);
@@ -436,6 +458,8 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
 
             node.setAttributes(spi.getNodeAttributes());
 
+            node.order(i);
+
             nodes.add(node);
 
             spi.spiStart(getTestIgniteInstanceName() + (i + 1));
@@ -491,6 +515,14 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
      * @throws Exception If failed.
      */
     private void stopSpis() throws Exception {
+        if (timeoutProcessor != null) {
+            timeoutProcessor.onKernalStop(true);
+
+            timeoutProcessor.stop(true);
+
+            timeoutProcessor = null;
+        }
+
         for (CommunicationSpi<Message> spi : spis) {
             spi.onContextDestroyed();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf16c99/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
index e215a34..842f283 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
@@ -202,8 +202,7 @@ public class TcpCommunicationSpiDropNodesTest extends GridCommonAbstractTest {
         final CountDownLatch latch = new CountDownLatch(1);
 
         grid(0).events().localListen(new IgnitePredicate<Event>() {
-            @Override
-            public boolean apply(Event event) {
+            @Override public boolean apply(Event evt) {
                 latch.countDown();
 
                 return true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf16c99/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java
index 8a20eec..a241a04 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java
@@ -1004,7 +1004,10 @@ public class HadoopExternalCommunication {
 
                 HandshakeFinish fin = new HandshakeFinish();
 
-                GridNioSession ses = nioSrvr.createSession(ch, F.asMap(HANDSHAKE_FINISH_META, fin)).get();
+                GridNioFuture<GridNioSession> sesFut =
+                    nioSrvr.createSession(ch, F.<Integer, Object>asMap(HANDSHAKE_FINISH_META, fin), false, null);
+
+                GridNioSession ses = sesFut.get();
 
                 client = new HadoopTcpNioCommunicationClient(ses);
 


Mime
View raw message