ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [2/2] ignite git commit: io optimizations
Date Wed, 05 Oct 2016 08:50:31 GMT
io optimizations


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e97176b4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e97176b4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e97176b4

Branch: refs/heads/ignite-comm-opts2
Commit: e97176b425bdb07987d8e00068f0c4507db33f26
Parents: 9b1c3a5
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Oct 5 11:50:19 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Oct 5 11:50:19 2016 +0300

----------------------------------------------------------------------
 .../rest/protocols/tcp/MockNioSession.java      |  11 +
 .../apache/ignite/internal/IgniteKernal.java    |  16 +-
 .../managers/communication/GridIoManager.java   | 131 +++-
 .../internal/util/ipc/IpcToNioAdapter.java      |   2 +-
 .../nio/GridConnectionBytesVerifyFilter.java    |   4 +-
 .../util/nio/GridNioAsyncNotifyFilter.java      |   4 +-
 .../internal/util/nio/GridNioCodecFilter.java   |   6 +-
 .../ignite/internal/util/nio/GridNioFilter.java |  10 +-
 .../internal/util/nio/GridNioFilterAdapter.java |   4 +-
 .../internal/util/nio/GridNioFilterChain.java   |   8 +-
 .../ignite/internal/util/nio/GridNioFuture.java |   4 +-
 .../util/nio/GridNioRecoveryDescriptor.java     |  66 +-
 .../ignite/internal/util/nio/GridNioServer.java | 695 +++++++++++++------
 .../internal/util/nio/GridNioSession.java       |  11 +
 .../internal/util/nio/GridNioSessionImpl.java   |  19 +-
 .../util/nio/GridSelectorNioSessionImpl.java    |  44 +-
 .../util/nio/GridTcpNioCommunicationClient.java |  44 +-
 .../internal/util/nio/SessionWriteRequest.java  |  85 +++
 .../internal/util/nio/ssl/GridNioSslFilter.java |   4 +-
 .../util/nio/ssl/GridNioSslHandler.java         |   4 +-
 .../communication/tcp/TcpCommunicationSpi.java  | 143 ++--
 .../nio/impl/GridNioFilterChainSelfTest.java    |  18 +-
 .../io/IgniteIoTestAbstractBenchmark.java       |  61 ++
 .../io/IgniteIoTestSendAllBenchmark.java        |  32 +
 .../io/IgniteIoTestSendRandomBenchmark.java     |  35 +
 25 files changed, 1075 insertions(+), 386 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java
index e848653..9bc4e7f 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.rest.protocols.tcp;
 
 import java.net.InetSocketAddress;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter;
 import org.apache.ignite.internal.util.nio.GridNioFinishedFuture;
 import org.apache.ignite.internal.util.nio.GridNioFuture;
@@ -111,6 +112,11 @@ public class MockNioSession extends GridMetadataAwareAdapter implements GridNioS
     }
 
     /** {@inheritDoc} */
+    @Override public void sendNoFuture(Object msg) throws IgniteCheckedException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
     @Override public GridNioFuture<Object> resumeReads() {
         return null;
     }
@@ -149,4 +155,9 @@ public class MockNioSession extends GridMetadataAwareAdapter implements GridNioS
     @Nullable @Override public GridNioRecoveryDescriptor inRecoveryDescriptor() {
         return null;
     }
+
+    /** {@inheritDoc} */
+    @Override public void systemMessage(Object msg) {
+        // No-op.
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 65740a9..065f2f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -3422,11 +3422,21 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
     /**
      * @param node Node.
      * @param payload Message payload.
-     * @param processFromNioThread If {@code true} message is processed from NIO thread.
+     * @param procFromNioThread If {@code true} message is processed from NIO thread.
      * @return Response future.
      */
-    public IgniteInternalFuture sendIoTest(ClusterNode node, byte[] payload, boolean processFromNioThread) {
-        return ctx.io().sendIoTest(node, payload, processFromNioThread);
+    public IgniteInternalFuture sendIoTest(ClusterNode node, byte[] payload, boolean procFromNioThread) {
+        return ctx.io().sendIoTest(node, payload, procFromNioThread);
+    }
+
+    /**
+     * @param nodes Nodes.
+     * @param payload Message payload.
+     * @param procFromNioThread If {@code true} message is processed from NIO thread.
+     * @return Response future.
+     */
+    public IgniteInternalFuture sendIoTest(List<ClusterNode> nodes, byte[] payload, boolean procFromNioThread) {
+        return ctx.io().sendIoTest(nodes, payload, procFromNioThread);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index d6a2835..b465919 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -215,7 +215,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     private boolean stopping;
 
     /** */
-    private final AtomicReference<ConcurrentHashMap<Long, GridFutureAdapter>> ioTestMap = new AtomicReference<>();
+    private final AtomicReference<ConcurrentHashMap<Long, IoTestFuture>> ioTestMap = new AtomicReference<>();
 
     /** */
     private final AtomicLong ioTestId = new AtomicLong();
@@ -384,45 +384,72 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                     }
                 }
                 else {
-                    GridFutureAdapter fut = ioTestMap().remove(msg0.id());
+                    IoTestFuture fut = ioTestMap().get(msg0.id());
 
-                    if (fut == null) {
+                    if (fut == null)
                         U.warn(log, "Failed to find IO test future [msg=" + msg0 + ']');
-
-                        return;
-                    }
-
-                    fut.onDone();
+                    else
+                        fut.onResponse();
                 }
             }
         });
     }
 
     /**
-     * @param node Node.
+     * @param nodes Nodes.
      * @param payload Payload.
-     * @param processFromNioThread If {@code true} message is processed from NIO thread.
+     * @param procFromNioThread If {@code true} message is processed from NIO thread.
      * @return Response future.
      */
-    public IgniteInternalFuture sendIoTest(ClusterNode node, byte[] payload, boolean processFromNioThread) {
-        if (ctx.localNodeId().equals(node.id()))
-            throw new IllegalArgumentException();
-
+    public IgniteInternalFuture sendIoTest(List<ClusterNode> nodes, byte[] payload, boolean procFromNioThread) {
         long id = ioTestId.getAndIncrement();
 
-        GridFutureAdapter fut = new GridFutureAdapter();
+        IoTestFuture fut = new IoTestFuture(id, nodes.size());
+
+        IgniteIoTestMessage msg = new IgniteIoTestMessage(id, true, payload);
+
+        msg.processFromNioThread(procFromNioThread);
 
         ioTestMap().put(id, fut);
 
-        try {
-            IgniteIoTestMessage msg = new IgniteIoTestMessage(id, true, payload);
+        for (int i = 0; i < nodes.size(); i++) {
+            ClusterNode node = nodes.get(i);
+
+            try {
+                send(node, GridTopic.TOPIC_IO_TEST, msg, GridIoPolicy.SYSTEM_POOL);
+            }
+            catch (IgniteCheckedException e) {
+                ioTestMap().remove(msg.id());
+
+                return new GridFinishedFuture(e);
+            }
+        }
+
+        return fut;
+    }
+
+    /**
+     * @param node Node.
+     * @param payload Payload.
+     * @param procFromNioThread If {@code true} message is processed from NIO thread.
+     * @return Response future.
+     */
+    public IgniteInternalFuture sendIoTest(ClusterNode node, byte[] payload, boolean procFromNioThread) {
+        long id = ioTestId.getAndIncrement();
 
-            msg.processFromNioThread(processFromNioThread);
+        IoTestFuture fut = new IoTestFuture(id, 1);
 
+        IgniteIoTestMessage msg = new IgniteIoTestMessage(id, true, payload);
+
+        msg.processFromNioThread(procFromNioThread);
+
+        ioTestMap().put(id, fut);
+
+        try {
             send(node, GridTopic.TOPIC_IO_TEST, msg, GridIoPolicy.SYSTEM_POOL);
         }
         catch (IgniteCheckedException e) {
-            ioTestMap().remove(id);
+            ioTestMap().remove(msg.id());
 
             return new GridFinishedFuture(e);
         }
@@ -433,8 +460,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     /**
      * @return IO test futures map.
      */
-    private ConcurrentHashMap<Long, GridFutureAdapter> ioTestMap() {
-        ConcurrentHashMap<Long, GridFutureAdapter> map = ioTestMap.get();
+    private ConcurrentHashMap<Long, IoTestFuture> ioTestMap() {
+        ConcurrentHashMap<Long, IoTestFuture> map = ioTestMap.get();
 
         if (map == null) {
             if (!ioTestMap.compareAndSet(null, map = new ConcurrentHashMap<>()))
@@ -698,16 +725,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                 return;
             }
 
-            // Check discovery.
-            ClusterNode node = ctx.discovery().node(nodeId);
-
-            if (node == null) {
-                if (log.isDebugEnabled())
-                    log.debug("Ignoring message from dead node [senderId=" + nodeId + ", msg=" + msg + ']');
-
-                return; // We can't receive messages from non-discovered ones.
-            }
-
             if (msg.topic() == null) {
                 int topicOrd = msg.topicOrdinal();
 
@@ -2720,4 +2737,56 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             return S.toString(DelayedMessage.class, this, super.toString());
         }
     }
+
+    /**
+     *
+     */
+    private class IoTestFuture extends GridFutureAdapter<Object> {
+        /** */
+        private final long id;
+
+        /** */
+        private int cntr;
+
+        /**
+         * @param id ID.
+         * @param cntr Counter.
+         */
+        IoTestFuture(long id, int cntr) {
+            assert cntr > 0 : cntr;
+
+            this.id = id;
+            this.cntr = cntr;
+        }
+
+        /**
+         *
+         */
+        void onResponse() {
+            boolean complete;
+
+            synchronized (this) {
+                complete = --cntr == 0;
+            }
+
+            if (complete)
+                onDone();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
+            if (super.onDone(res, err)) {
+                ioTestMap().remove(id);
+
+                return true;
+            }
+
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(IoTestFuture.class, this);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
index 6820dc7..d108b56 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
@@ -201,7 +201,7 @@ public class IpcToNioAdapter<T> {
         }
 
         /** {@inheritDoc} */
-        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) {
+        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) {
             assert ses == IpcToNioAdapter.this.ses;
 
             return send((Message)msg);

http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java
index 13d7ca7..02a637d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java
@@ -67,8 +67,8 @@ public class GridConnectionBytesVerifyFilter extends GridNioFilterAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException {
-        return proceedSessionWrite(ses, msg);
+    @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException {
+        return proceedSessionWrite(ses, msg, fut);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java
index 9925d2e..8aa113a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java
@@ -107,8 +107,8 @@ public class GridNioAsyncNotifyFilter extends GridNioFilterAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException {
-        return proceedSessionWrite(ses, msg);
+    @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException {
+        return proceedSessionWrite(ses, msg, fut);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java
index a2f543d..f7a3022 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java
@@ -76,15 +76,15 @@ public class GridNioCodecFilter extends GridNioFilterAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException {
+    @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException {
         // No encoding needed in direct mode.
         if (directMode)
-            return proceedSessionWrite(ses, msg);
+            return proceedSessionWrite(ses, msg, fut);
 
         try {
             ByteBuffer res = parser.encode(ses, msg);
 
-            return proceedSessionWrite(ses, res);
+            return proceedSessionWrite(ses, res, fut);
         }
         catch (IOException e) {
             throw new GridNioException(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java
index 5f88b1f..f126cd9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java
@@ -105,10 +105,11 @@ public interface GridNioFilter {
      *
      * @param ses Session instance.
      * @param msg Message to send.
-     * @return Write future.
+     * @param fut {@code True} if write future should be created.
+     * @return Write future or {@code null}.
      * @throws IgniteCheckedException If filter is not in chain or GridNioException occurred in the underlying filter.
      */
-    public GridNioFuture<?> proceedSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException;
+    public GridNioFuture<?> proceedSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException;
 
     /**
      * Forwards session close request to the next logical filter in filter chain.
@@ -149,10 +150,11 @@ public interface GridNioFilter {
      *
      * @param ses Session on which message should be written.
      * @param msg Message being written.
-     * @return Write future.
+     * @param fut {@code True} if write future should be created.
+     * @return Write future or {@code null}.
      * @throws GridNioException If GridNioException occurred while handling event.
      */
-    public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException;
+    public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException;
 
     /**
      * Invoked when a new messages received.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java
index 18ab1b2..0a825bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java
@@ -108,10 +108,10 @@ public abstract class GridNioFilterAdapter implements GridNioFilter {
     }
 
     /** {@inheritDoc} */
-    @Override public GridNioFuture<?> proceedSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException {
+    @Override public GridNioFuture<?> proceedSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException {
         checkNext();
 
-        return nextFilter.onSessionWrite(ses, msg);
+        return nextFilter.onSessionWrite(ses, msg, fut);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java
index 8a43e29..6415b46 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java
@@ -181,8 +181,8 @@ public class GridNioFilterChain<T> extends GridNioFilterAdapter {
      * @return Send future.
      * @throws IgniteCheckedException If IgniteCheckedException occurred while handling event.
      */
-    @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException {
-        return tail.onSessionWrite(ses, msg);
+    @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException {
+        return tail.onSessionWrite(ses, msg, fut);
     }
 
     /**
@@ -255,9 +255,9 @@ public class GridNioFilterChain<T> extends GridNioFilterAdapter {
         }
 
         /** {@inheritDoc} */
-        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg)
+        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut)
             throws IgniteCheckedException {
-            return proceedSessionWrite(ses, msg);
+            return proceedSessionWrite(ses, msg, fut);
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
index b02acc8..6c0c9c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
@@ -45,9 +45,9 @@ public interface GridNioFuture<R> extends IgniteInternalFuture<R> {
     /**
      * Sets ack closure which will be applied when ack received.
      *
-     * @param closure Ack closure.
+     * @param c Ack closure.
      */
-    public void ackClosure(IgniteInClosure<IgniteException> closure);
+    public void ackClosure(IgniteInClosure<IgniteException> c);
 
     /**
      * The method will be called when ack received.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
index 4598eef..dfc78be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
@@ -36,8 +36,8 @@ public class GridNioRecoveryDescriptor {
     /** Number of acknowledged messages. */
     private long acked;
 
-    /** Unacknowledged message futures. */
-    private final ArrayDeque<GridNioFuture<?>> msgFuts;
+    /** Unacknowledged messages. */
+    private final ArrayDeque<SessionWriteRequest> msgReqs;
 
     /** Number of messages to resend. */
     private int resendCnt;
@@ -87,7 +87,7 @@ public class GridNioRecoveryDescriptor {
         assert !node.isLocal() : node;
         assert queueLimit > 0;
 
-        msgFuts = new ArrayDeque<>(queueLimit);
+        msgReqs = new ArrayDeque<>(queueLimit);
 
         this.queueLimit = queueLimit;
         this.node = node;
@@ -155,19 +155,19 @@ public class GridNioRecoveryDescriptor {
     }
 
     /**
-     * @param fut NIO future.
+     * @param req Write request.
      * @return {@code False} if queue limit is exceeded.
      */
-    public boolean add(GridNioFuture<?> fut) {
-        assert fut != null;
+    public boolean add(SessionWriteRequest req) {
+        assert req != null;
 
-        if (!fut.skipRecovery()) {
+        if (!req.skipRecovery()) {
             if (resendCnt == 0) {
-                msgFuts.addLast(fut);
+                msgReqs.addLast(req);
 
                 sentCnt++;
 
-                return msgFuts.size() < queueLimit;
+                return msgReqs.size() < queueLimit;
             }
             else
                 resendCnt--;
@@ -182,17 +182,15 @@ public class GridNioRecoveryDescriptor {
     public void ackReceived(long rcvCnt) {
         if (log.isDebugEnabled())
             log.debug("Handle acknowledgment [acked=" + acked + ", rcvCnt=" + rcvCnt +
-                ", msgFuts=" + msgFuts.size() + ']');
+                ", msgReqs=" + msgReqs.size() + ']');
 
         while (acked < rcvCnt) {
-            GridNioFuture<?> fut = msgFuts.pollFirst();
+            SessionWriteRequest fut = msgReqs.pollFirst();
 
             assert fut != null : "Missed message future [rcvCnt=" + rcvCnt +
                 ", acked=" + acked +
                 ", desc=" + this + ']';
 
-            assert fut.isDone() : fut;
-
             if (fut.ackClosure() != null)
                 fut.ackClosure().apply(null);
 
@@ -215,7 +213,7 @@ public class GridNioRecoveryDescriptor {
      * @return {@code False} if descriptor is reserved.
      */
     public boolean onNodeLeft() {
-        GridNioFuture<?>[] futs = null;
+        SessionWriteRequest[] reqs = null;
 
         synchronized (this) {
             nodeLeft = true;
@@ -223,15 +221,15 @@ public class GridNioRecoveryDescriptor {
             if (reserved)
                 return false;
 
-            if (!msgFuts.isEmpty()) {
-                futs = msgFuts.toArray(new GridNioFuture<?>[msgFuts.size()]);
+            if (!msgReqs.isEmpty()) {
+                reqs = msgReqs.toArray(new SessionWriteRequest[msgReqs.size()]);
 
-                msgFuts.clear();
+                msgReqs.clear();
             }
         }
 
-        if (futs != null)
-            completeOnNodeLeft(futs);
+        if (reqs != null)
+            notifyOnNodeLeft(reqs);
 
         return true;
     }
@@ -239,8 +237,8 @@ public class GridNioRecoveryDescriptor {
     /**
      * @return Message futures for unacknowledged messages.
      */
-    public Deque<GridNioFuture<?>> messagesFutures() {
-        return msgFuts;
+    public Deque<SessionWriteRequest> messagesFutures() {
+        return msgReqs;
     }
 
     /**
@@ -278,7 +276,7 @@ public class GridNioRecoveryDescriptor {
             if (!nodeLeft)
                 ackReceived(rcvCnt);
 
-            resendCnt = msgFuts.size();
+            resendCnt = msgReqs.size();
         }
     }
 
@@ -337,7 +335,7 @@ public class GridNioRecoveryDescriptor {
      *
      */
     public void release() {
-        GridNioFuture<?>[] futs = null;
+        SessionWriteRequest[] futs = null;
 
         synchronized (this) {
             connected = false;
@@ -357,15 +355,15 @@ public class GridNioRecoveryDescriptor {
                 notifyAll();
             }
 
-            if (nodeLeft && !msgFuts.isEmpty()) {
-                futs = msgFuts.toArray(new GridNioFuture<?>[msgFuts.size()]);
+            if (nodeLeft && !msgReqs.isEmpty()) {
+                futs = msgReqs.toArray(new SessionWriteRequest[msgReqs.size()]);
 
-                msgFuts.clear();
+                msgReqs.clear();
             }
         }
 
         if (futs != null)
-            completeOnNodeLeft(futs);
+            notifyOnNodeLeft(futs);
     }
 
     /**
@@ -426,16 +424,16 @@ public class GridNioRecoveryDescriptor {
     }
 
     /**
-     * @param futs Futures to complete.
+     * @param reqs Requests to notify about error.
      */
-    private void completeOnNodeLeft(GridNioFuture<?>[] futs) {
-        for (GridNioFuture<?> msg : futs) {
-            IOException e = new IOException("Failed to send message, node has left: " + node.id());
+    private void notifyOnNodeLeft(SessionWriteRequest[] reqs) {
+        IOException e = new IOException("Failed to send message, node has left: " + node.id());
 
-            ((GridNioFutureImpl)msg).onDone(e);
+        for (SessionWriteRequest req : reqs) {
+            req.onError(e);
 
-            if (msg.ackClosure() != null)
-                msg.ackClosure().apply(new IgniteException(e));
+            if (req.ackClosure() != null)
+                req.ackClosure().apply(new IgniteException(e));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index ba8ae50..281d985 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -412,60 +412,87 @@ public class GridNioServer<T> {
     /**
      * @param ses Session.
      * @param msg Message.
+     * @param createFut {@code True} if future should be created.
      * @return Future for operation.
      */
-    GridNioFuture<?> send(GridNioSession ses, ByteBuffer msg) {
-        assert ses instanceof GridSelectorNioSessionImpl;
+    GridNioFuture<?> send(GridNioSession ses, ByteBuffer msg, boolean createFut) throws IgniteCheckedException {
+        assert ses instanceof GridSelectorNioSessionImpl : ses;
 
         GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
 
-        NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg);
+        if (createFut) {
+            NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg);
 
-        send0(impl, fut, false);
+            send0(impl, fut, false);
 
-        return fut;
+            return fut;
+        }
+        else {
+            SessionWriteRequest req = new WriteRequestImpl(ses, msg, true);
+
+            send0(impl, req, false);
+
+            return null;
+        }
     }
 
     /**
      * @param ses Session.
      * @param msg Message.
+     * @param createFut {@code True} if future should be created.
      * @return Future for operation.
      */
-    GridNioFuture<?> send(GridNioSession ses, Message msg) {
+    GridNioFuture<?> send(GridNioSession ses, Message msg, boolean createFut) throws IgniteCheckedException {
         assert ses instanceof GridSelectorNioSessionImpl;
 
         GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
 
-        NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg,
-            skipRecoveryPred.apply(msg));
+        if (createFut) {
+            NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg,
+                skipRecoveryPred.apply(msg));
 
-        send0(impl, fut, false);
+            send0(impl, fut, false);
 
-        return fut;
+            return fut;
+        }
+        else {
+            SessionWriteRequest req = new WriteRequestImpl(ses, msg, skipRecoveryPred.apply(msg));
+
+            send0(impl, req, false);
+
+            return null;
+        }
     }
 
     /**
      * @param ses Session.
-     * @param fut Future.
+     * @param req Request.
      * @param sys System message flag.
+     * @throws IgniteCheckedException If session was closed.
      */
-    private void send0(GridSelectorNioSessionImpl ses, NioOperationFuture<?> fut, boolean sys) {
+    private void send0(GridSelectorNioSessionImpl ses, SessionWriteRequest req, boolean sys) throws IgniteCheckedException {
         assert ses != null;
-        assert fut != null;
+        assert req != null;
 
-        int msgCnt = sys ? ses.offerSystemFuture(fut) : ses.offerFuture(fut);
+        int msgCnt = sys ? ses.offerSystemFuture(req) : ses.offerFuture(req);
 
         IgniteInClosure<IgniteException> ackC;
 
         if (!sys && (ackC = ses.removeMeta(ACK_CLOSURE.ordinal())) != null)
-            fut.ackClosure(ackC);
+            req.ackClosure(ackC);
 
         if (ses.closed()) {
-            if (ses.removeFuture(fut))
-                fut.connectionClosed();
+            if (ses.removeFuture(req)) {
+                IOException err = new IOException("Failed to send message (connection was closed): " + ses);
+
+                req.onError(err);
+
+                if (!(req instanceof GridNioFuture))
+                    throw new IgniteCheckedException(err);
+            }
         }
         else if (!ses.procWrite.get() && ses.procWrite.compareAndSet(false, true))
-                clientWorkers.get(ses.selectorIndex()).offer(fut);
+            clientWorkers.get(ses.selectorIndex()).offer((SessionChangeRequest)req);
 
         if (msgQueueLsnr != null)
             msgQueueLsnr.apply(ses, msgCnt);
@@ -476,10 +503,10 @@ public class GridNioServer<T> {
      *
      * @param ses Session.
      * @param msg Message.
-     * @return Future.
+     * @throws IgniteCheckedException If session was closed.
      */
-    public GridNioFuture<?> sendSystem(GridNioSession ses, Message msg) {
-        return sendSystem(ses, msg, null);
+    public void sendSystem(GridNioSession ses, Message msg) throws IgniteCheckedException {
+        sendSystem(ses, msg, null);
     }
 
     /**
@@ -488,27 +515,30 @@ public class GridNioServer<T> {
      * @param ses Session.
      * @param msg Message.
      * @param lsnr Future listener notified from the session thread.
-     * @return Future.
+     * @throws IgniteCheckedException If session was closed.
      */
-    public GridNioFuture<?> sendSystem(GridNioSession ses,
+    public void sendSystem(GridNioSession ses,
         Message msg,
-        @Nullable IgniteInClosure<? super IgniteInternalFuture<?>> lsnr) {
+        @Nullable IgniteInClosure<? super IgniteInternalFuture<?>> lsnr) throws IgniteCheckedException {
         assert ses instanceof GridSelectorNioSessionImpl;
 
         GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
 
-        NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg,
-            skipRecoveryPred.apply(msg));
-
         if (lsnr != null) {
+            NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg,
+                skipRecoveryPred.apply(msg));
+
             fut.listen(lsnr);
 
             assert !fut.isDone();
-        }
 
-        send0(impl, fut, true);
+            send0(impl, fut, true);
+        }
+        else {
+            SessionWriteRequest req = new WriteRequestSystemImpl(ses, msg);
 
-        return fut;
+            send0(impl, req, true);
+        }
     }
 
     /**
@@ -520,25 +550,25 @@ public class GridNioServer<T> {
         GridNioRecoveryDescriptor recoveryDesc = ses.outRecoveryDescriptor();
 
         if (recoveryDesc != null && !recoveryDesc.messagesFutures().isEmpty()) {
-            Deque<GridNioFuture<?>> futs = recoveryDesc.messagesFutures();
+            Deque<SessionWriteRequest> futs = recoveryDesc.messagesFutures();
 
             if (log.isDebugEnabled())
                 log.debug("Resend messages [rmtNode=" + recoveryDesc.node().id() + ", msgCnt=" + futs.size() + ']');
 
             GridSelectorNioSessionImpl ses0 = (GridSelectorNioSessionImpl)ses;
 
-            GridNioFuture<?> fut0 = futs.iterator().next();
+            SessionWriteRequest fut0 = futs.iterator().next();
 
-            for (GridNioFuture<?> fut : futs) {
+            for (SessionWriteRequest fut : futs) {
                 fut.messageThread(true);
 
-                ((NioOperationFuture)fut).resetSession(ses0);
+                fut.resetSession(ses0);
             }
 
             ses0.resend(futs);
 
             // Wake up worker.
-            clientWorkers.get(ses0.selectorIndex()).offer(((NioOperationFuture)fut0));
+            clientWorkers.get(ses0.selectorIndex()).offer(((SessionChangeRequest)fut0));
         }
     }
 
@@ -554,7 +584,7 @@ public class GridNioServer<T> {
      * @param op Operation.
      * @return Future for operation.
      */
-    GridNioFuture<?> pauseResumeReads(GridNioSession ses, NioOperation op) {
+    private GridNioFuture<?> pauseResumeReads(GridNioSession ses, NioOperation op) {
         assert ses instanceof GridSelectorNioSessionImpl;
         assert op == NioOperation.PAUSE_READ || op == NioOperation.RESUME_READ;
 
@@ -835,21 +865,30 @@ public class GridNioServer<T> {
 
             while (true) {
                 ByteBuffer buf = ses.removeMeta(BUF_META_KEY);
-                NioOperationFuture<?> req = ses.removeMeta(NIO_OPERATION.ordinal());
+                SessionWriteRequest req = ses.removeMeta(NIO_OPERATION.ordinal());
 
                 // Check if there were any pending data from previous writes.
                 if (buf == null) {
                     assert req == null;
 
-                    req = (NioOperationFuture<?>)ses.pollFuture();
+                    req = ses.pollFuture();
 
                     if (req == null) {
-                        key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+                        if (ses.procWrite.get()) {
+                            boolean set = ses.procWrite.compareAndSet(true, false);
+
+                            assert set;
+
+                            if (ses.writeQueue().isEmpty())
+                                key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+                            else
+                                ses.procWrite.set(true);
+                        }
 
                         break;
                     }
 
-                    buf = req.message();
+                    buf = (ByteBuffer)req.message();
                 }
 
                 if (!skipWrite) {
@@ -884,7 +923,7 @@ public class GridNioServer<T> {
                     // Message was successfully written.
                     assert req != null;
 
-                    req.onDone();
+                    req.onMessageWritten();
                 }
             }
         }
@@ -964,6 +1003,12 @@ public class GridNioServer<T> {
                     readBuf.compact();
                 else
                     readBuf.clear();
+
+                if (ses.hasSystemMessage() && !ses.procWrite.get()) {
+                    ses.procWrite.set(true);
+
+                    registerWrite(ses);
+                }
             }
             catch (IgniteCheckedException e) {
                 close(ses, e);
@@ -1036,16 +1081,29 @@ public class GridNioServer<T> {
                 if (ses.meta(WRITE_BUF_LIMIT) != null)
                     buf.limit((int)ses.meta(WRITE_BUF_LIMIT));
 
-                NioOperationFuture<?> req = ses.removeMeta(NIO_OPERATION.ordinal());
+                SessionWriteRequest req = ses.removeMeta(NIO_OPERATION.ordinal());
 
                 while (true) {
                     if (req == null) {
-                        req = (NioOperationFuture<?>)ses.pollFuture();
+                        req = systemMessage(ses);
 
-                        if (req == null && buf.position() == 0) {
-                            key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+                        if (req == null) {
+                            req = ses.pollFuture();
 
-                            break;
+                            if (req == null && buf.position() == 0) {
+                                if (ses.procWrite.get()) {
+                                    boolean set = ses.procWrite.compareAndSet(true, false);
+
+                                    assert set;
+
+                                    if (ses.writeQueue().isEmpty())
+                                        key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+                                    else
+                                        ses.procWrite.set(true);
+                                }
+
+                                break;
+                            }
                         }
                     }
 
@@ -1053,7 +1111,7 @@ public class GridNioServer<T> {
                     boolean finished = false;
 
                     if (req != null) {
-                        msg = req.directMessage();
+                        msg = (Message)req.message();
 
                         assert msg != null;
 
@@ -1068,14 +1126,17 @@ public class GridNioServer<T> {
 
                     // Fill up as many messages as possible to write buffer.
                     while (finished) {
-                        req.onDone();
+                        req.onMessageWritten();
+
+                        req = systemMessage(ses);
 
-                        req = (NioOperationFuture<?>)ses.pollFuture();
+                        if (req == null)
+                            req = ses.pollFuture();
 
                         if (req == null)
                             break;
 
-                        msg = req.directMessage();
+                        msg = (Message)req.message();
 
                         assert msg != null;
 
@@ -1179,6 +1240,24 @@ public class GridNioServer<T> {
         }
 
         /**
+         * @param ses Session.
+         * @return System message request.
+         */
+        private SessionWriteRequest systemMessage(GridSelectorNioSessionImpl ses) {
+            if (ses.hasSystemMessage()) {
+                Object msg = ses.systemMessage();
+
+                SessionWriteRequest req = new WriteRequestSystemImpl(ses, msg);
+
+                assert !ses.hasSystemMessage();
+
+                return req;
+            }
+
+            return null;
+        }
+
+        /**
          * Processes write-ready event on the key.
          *
          * @param key Key that is ready to be written.
@@ -1190,7 +1269,7 @@ public class GridNioServer<T> {
 
             GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
             ByteBuffer buf = ses.writeBuffer();
-            NioOperationFuture<?> req = ses.removeMeta(NIO_OPERATION.ordinal());
+            SessionWriteRequest req = ses.removeMeta(NIO_OPERATION.ordinal());
 
             MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
 
@@ -1204,21 +1283,25 @@ public class GridNioServer<T> {
             }
 
             if (req == null) {
-                req = (NioOperationFuture<?>)ses.pollFuture();
+                req = systemMessage(ses);
 
-                if (req == null && buf.position() == 0) {
-                    if (ses.procWrite.get()) {
-                        boolean set = ses.procWrite.compareAndSet(true, false);
+                if (req == null) {
+                    req = ses.pollFuture();
 
-                        assert set;
+                    if (req == null && buf.position() == 0) {
+                        if (ses.procWrite.get()) {
+                            boolean set = ses.procWrite.compareAndSet(true, false);
 
-                        if (ses.writeQueue().isEmpty())
-                            key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
-                        else
-                            ses.procWrite.set(true);
-                    }
+                            assert set;
 
-                    return;
+                            if (ses.writeQueue().isEmpty())
+                                key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+                            else
+                                ses.procWrite.set(true);
+                        }
+
+                        return;
+                    }
                 }
             }
 
@@ -1226,9 +1309,9 @@ public class GridNioServer<T> {
             boolean finished = false;
 
             if (req != null) {
-                msg = req.directMessage();
+                msg = (Message)req.message();
 
-                assert msg != null;
+                assert msg != null : req;
 
                 if (writer != null)
                     writer.setCurrentWriteClass(msg.getClass());
@@ -1241,14 +1324,17 @@ public class GridNioServer<T> {
 
             // Fill up as many messages as possible to write buffer.
             while (finished) {
-                req.onDone();
+                req.onMessageWritten();
 
-                req = (NioOperationFuture<?>)ses.pollFuture();
+                req = systemMessage(ses);
+
+                if (req == null)
+                    req = ses.pollFuture();
 
                 if (req == null)
                     break;
 
-                msg = req.directMessage();
+                msg = (Message)req.message();
 
                 assert msg != null;
 
@@ -1301,7 +1387,7 @@ public class GridNioServer<T> {
      */
     private abstract class AbstractNioClientWorker extends GridWorker {
         /** Queue of change requests on this selector. */
-        private final ConcurrentLinkedQueue<NioOperationFuture> changeReqs = new ConcurrentLinkedQueue<>();
+        private final ConcurrentLinkedQueue<SessionChangeRequest> changeReqs = new ConcurrentLinkedQueue<>();
 
         /** Selector to select read events. */
         private Selector selector;
@@ -1409,7 +1495,7 @@ public class GridNioServer<T> {
          *
          * @param req Change request.
          */
-        private void offer(NioOperationFuture req) {
+        private void offer(SessionChangeRequest req) {
             changeReqs.offer(req);
 
             selector.wakeup();
@@ -1426,31 +1512,27 @@ public class GridNioServer<T> {
                 long lastIdleCheck = U.currentTimeMillis();
 
                 while (!closed && selector.isOpen()) {
-                    NioOperationFuture req;
+                    SessionChangeRequest req0;
 
-                    while ((req = changeReqs.poll()) != null) {
-                        switch (req.operation()) {
+                    while ((req0 = changeReqs.poll()) != null) {
+                        switch (req0.operation()) {
                             case REGISTER: {
-                                register(req);
+                                register((NioOperationFuture)req0);
 
                                 break;
                             }
 
                             case REQUIRE_WRITE: {
-                                //Just register write key.
-                                SelectionKey key = req.session().key();
-
-                                if (key.isValid()) {
-                                    key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+                                SessionWriteRequest req = (SessionWriteRequest)req0;
 
-                                    // Update timestamp to protected against false write timeout.
-                                    ((GridNioSessionImpl)key.attachment()).bytesSent(0);
-                                }
+                                registerWrite((GridSelectorNioSessionImpl)req.session());
 
                                 break;
                             }
 
                             case CLOSE: {
+                                NioOperationFuture req = (NioOperationFuture)req0;
+
                                 if (close(req.session(), null))
                                     req.onDone(true);
                                 else
@@ -1460,6 +1542,8 @@ public class GridNioServer<T> {
                             }
 
                             case PAUSE_READ: {
+                                NioOperationFuture req = (NioOperationFuture)req0;
+
                                 SelectionKey key = req.session().key();
 
                                 if (key.isValid()) {
@@ -1478,6 +1562,8 @@ public class GridNioServer<T> {
                             }
 
                             case RESUME_READ: {
+                                NioOperationFuture req = (NioOperationFuture)req0;
+
                                 SelectionKey key = req.session().key();
 
                                 if (key.isValid()) {
@@ -1496,75 +1582,15 @@ public class GridNioServer<T> {
                             }
 
                             case DUMP_STATS: {
-                                StringBuilder sb = new StringBuilder();
-
-                                Set<SelectionKey> keys = selector.keys();
-
-                                sb.append(U.nl())
-                                    .append(">> Selector info [idx=").append(idx)
-                                    .append(", keysCnt=").append(keys.size())
-                                    .append("]").append(U.nl());
-
-                                for (SelectionKey key : keys) {
-                                    GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
-
-                                    MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
-                                    MessageReader reader = ses.meta(GridDirectParser.READER_META_KEY);
-
-                                    sb.append("    Connection info [")
-                                        .append("in=").append(ses.accepted())
-                                        .append(", rmtAddr=").append(ses.remoteAddress())
-                                        .append(", locAddr=").append(ses.localAddress());
-
-                                    GridNioRecoveryDescriptor outDesc = ses.outRecoveryDescriptor();
-
-                                    if (outDesc != null) {
-                                        sb.append(", msgsSent=").append(outDesc.sent())
-                                            .append(", msgsAckedByRmt=").append(outDesc.acked())
-                                            .append(", descIdHash=").append(System.identityHashCode(outDesc));
-                                    }
-                                    else
-                                        sb.append(", outRecoveryDesc=null");
-
-                                    GridNioRecoveryDescriptor inDesc = ses.inRecoveryDescriptor();
+                                NioOperationFuture req = (NioOperationFuture)req0;
 
-                                    if (inDesc != null) {
-                                        sb.append(", msgsRcvd=").append(inDesc.received())
-                                            .append(", lastAcked=").append(inDesc.lastAcknowledged())
-                                            .append(", descIdHash=").append(System.identityHashCode(inDesc));
-                                    }
-                                    else
-                                        sb.append(", inRecoveryDesc=null");
-
-                                    sb.append(", bytesRcvd=").append(ses.bytesReceived())
-                                        .append(", bytesSent=").append(ses.bytesSent())
-                                        .append(", opQueueSize=").append(ses.writeQueueSize())
-                                        .append(", msgWriter=").append(writer != null ? writer.toString() : "null")
-                                        .append(", msgReader=").append(reader != null ? reader.toString() : "null");
-
-                                    int cnt = 0;
-
-                                    for (GridNioFuture<?> fut : ses.writeQueue()) {
-                                        if (cnt == 0)
-                                            sb.append(",\n opQueue=[").append(fut);
-                                        else
-                                            sb.append(',').append(fut);
-
-                                        if (++cnt == 5) {
-                                            sb.append(']');
-
-                                            break;
-                                        }
-                                    }
-
-
-                                    sb.append("]").append(U.nl());
+                                try {
+                                    dumpStats();
+                                }
+                                finally {
+                                    // Complete the request just in case (none should wait on this future).
+                                    req.onDone(true);
                                 }
-
-                                U.warn(log, sb.toString());
-
-                                // Complete the request just in case (none should wait on this future).
-                                req.onDone(true);
                             }
                         }
                     }
@@ -1616,6 +1642,93 @@ public class GridNioServer<T> {
         }
 
         /**
+         * @param ses Session.
+         */
+        final void registerWrite(GridSelectorNioSessionImpl ses) {
+            SelectionKey key = ses.key();
+
+            if (key.isValid()) {
+                if ((key.interestOps() & SelectionKey.OP_WRITE) == 0)
+                    key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+
+                // Update timestamp to protected against false write timeout.
+                ses.bytesSent(0);
+            }
+        }
+
+        /**
+         *
+         */
+        private void dumpStats() {
+            StringBuilder sb = new StringBuilder();
+
+            Set<SelectionKey> keys = selector.keys();
+
+            sb.append(U.nl())
+                .append(">> Selector info [idx=").append(idx)
+                .append(", keysCnt=").append(keys.size())
+                .append("]").append(U.nl());
+
+            for (SelectionKey key : keys) {
+                GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
+
+                MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
+                MessageReader reader = ses.meta(GridDirectParser.READER_META_KEY);
+
+                sb.append("    Connection info [")
+                    .append("in=").append(ses.accepted())
+                    .append(", rmtAddr=").append(ses.remoteAddress())
+                    .append(", locAddr=").append(ses.localAddress());
+
+                GridNioRecoveryDescriptor outDesc = ses.outRecoveryDescriptor();
+
+                if (outDesc != null) {
+                    sb.append(", msgsSent=").append(outDesc.sent())
+                        .append(", msgsAckedByRmt=").append(outDesc.acked())
+                        .append(", descIdHash=").append(System.identityHashCode(outDesc));
+                }
+                else
+                    sb.append(", outRecoveryDesc=null");
+
+                GridNioRecoveryDescriptor inDesc = ses.inRecoveryDescriptor();
+
+                if (inDesc != null) {
+                    sb.append(", msgsRcvd=").append(inDesc.received())
+                        .append(", lastAcked=").append(inDesc.lastAcknowledged())
+                        .append(", descIdHash=").append(System.identityHashCode(inDesc));
+                }
+                else
+                    sb.append(", inRecoveryDesc=null");
+
+                sb.append(", bytesRcvd=").append(ses.bytesReceived())
+                    .append(", bytesSent=").append(ses.bytesSent())
+                    .append(", opQueueSize=").append(ses.writeQueueSize())
+                    .append(", msgWriter=").append(writer != null ? writer.toString() : "null")
+                    .append(", msgReader=").append(reader != null ? reader.toString() : "null");
+
+                int cnt = 0;
+
+                for (SessionWriteRequest req : ses.writeQueue()) {
+                    if (cnt == 0)
+                        sb.append(",\n opQueue=[").append(req);
+                    else
+                        sb.append(',').append(req);
+
+                    if (++cnt == 5) {
+                        sb.append(']');
+
+                        break;
+                    }
+                }
+
+
+                sb.append("]").append(U.nl());
+            }
+
+            U.warn(log, sb.toString());
+        }
+
+        /**
          * Processes keys selected by a selector.
          *
          * @param keys Selected keys.
@@ -1886,17 +1999,19 @@ public class GridNioServer<T> {
                 ses.removeMeta(BUF_META_KEY);
 
                 // Since ses is in closed state, no write requests will be added.
-                NioOperationFuture<?> fut = ses.removeMeta(NIO_OPERATION.ordinal());
+                SessionWriteRequest req = ses.removeMeta(NIO_OPERATION.ordinal());
 
                 GridNioRecoveryDescriptor outRecovery = ses.outRecoveryDescriptor();
                 GridNioRecoveryDescriptor inRecovery = ses.inRecoveryDescriptor();
 
+                IOException err = new IOException("Failed to send message (connection was closed): " + ses);
+
                 if (outRecovery != null || inRecovery != null) {
                     try {
                         // Poll will update recovery data.
-                        while ((fut = (NioOperationFuture<?>)ses.pollFuture()) != null) {
-                            if (fut.skipRecovery())
-                                fut.connectionClosed();
+                        while ((req = ses.pollFuture()) != null) {
+                            if (req.skipRecovery())
+                                req.onError(err);
                         }
                     }
                     finally {
@@ -1908,11 +2023,11 @@ public class GridNioServer<T> {
                     }
                 }
                 else {
-                    if (fut != null)
-                        fut.connectionClosed();
+                    if (req != null)
+                        req.onError(err);
 
-                    while ((fut = (NioOperationFuture<?>)ses.pollFuture()) != null)
-                        fut.connectionClosed();
+                    while ((req = ses.pollFuture()) != null)
+                        req.onError(err);
                 }
 
                 try {
@@ -2136,9 +2251,193 @@ public class GridNioServer<T> {
     }
 
     /**
+     *
+     */
+    private static final class WriteRequestSystemImpl implements SessionWriteRequest, SessionChangeRequest {
+        /** */
+        private final Object msg;
+
+        /** */
+        private final GridNioSession ses;
+
+        /**
+         * @param ses Session.
+         * @param msg Message.
+         */
+        WriteRequestSystemImpl(GridNioSession ses, Object msg) {
+            this.ses = ses;
+            this.msg = msg;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void messageThread(boolean msgThread) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean messageThread() {
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean skipRecovery() {
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void ackClosure(IgniteInClosure<IgniteException> c) {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onAckReceived() {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteInClosure<IgniteException> ackClosure() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onError(Exception e) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object message() {
+            return msg;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMessageWritten() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void resetSession(GridNioSession ses) {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridNioSession session() {
+            return ses;
+        }
+
+        /** {@inheritDoc} */
+        @Override public NioOperation operation() {
+            return NioOperation.REQUIRE_WRITE;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(WriteRequestSystemImpl.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    private static final class WriteRequestImpl implements SessionWriteRequest, SessionChangeRequest {
+        /** */
+        private GridNioSession ses;
+
+        /** */
+        private final Object msg;
+
+        /** */
+        private boolean msgThread;
+
+        /** */
+        private final boolean skipRecovery;
+
+        /** */
+        private IgniteInClosure<IgniteException> ackC;
+
+        /**
+         * @param ses Session.
+         * @param msg Message.
+         * @param skipRecovery Skip recovery flag.
+         */
+        WriteRequestImpl(GridNioSession ses, Object msg, boolean skipRecovery) {
+            this.ses = ses;
+            this.msg = msg;
+            this.skipRecovery = skipRecovery;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void messageThread(boolean msgThread) {
+            this.msgThread = msgThread;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean messageThread() {
+            return msgThread;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean skipRecovery() {
+            return skipRecovery;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void ackClosure(IgniteInClosure<IgniteException> c) {
+            ackC = c;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onAckReceived() {
+            assert msg instanceof Message;
+
+            ((Message)msg).onAckReceived();
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteInClosure<IgniteException> ackClosure() {
+            return ackC;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onError(Exception e) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object message() {
+            return msg;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMessageWritten() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void resetSession(GridNioSession ses) {
+            this.ses = ses;
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridNioSession session() {
+            return ses;
+        }
+
+        /** {@inheritDoc} */
+        @Override public NioOperation operation() {
+            return NioOperation.REQUIRE_WRITE;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(WriteRequestImpl.class, this);
+        }
+    }
+
+    /**
      * Class for requesting write and session close operations.
      */
-    private static class NioOperationFuture<R> extends GridNioFutureImpl<R> {
+    private static class NioOperationFuture<R> extends GridNioFutureImpl<R> implements SessionWriteRequest,
+        SessionChangeRequest {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -2154,11 +2453,7 @@ public class GridNioServer<T> {
         private NioOperation op;
 
         /** Message. */
-        @GridToStringExclude
-        private ByteBuffer msg;
-
-        /** Direct message. */
-        private Message commMsg;
+        private Object msg;
 
         /** */
         @GridToStringExclude
@@ -2220,8 +2515,7 @@ public class GridNioServer<T> {
          * @param op Requested operation.
          * @param msg Message.
          */
-        NioOperationFuture(GridSelectorNioSessionImpl ses, NioOperation op,
-            ByteBuffer msg) {
+        NioOperationFuture(GridSelectorNioSessionImpl ses, NioOperation op, Object msg) {
             assert ses != null;
             assert op != null;
             assert op != NioOperation.REGISTER;
@@ -2249,38 +2543,25 @@ public class GridNioServer<T> {
 
             this.ses = ses;
             this.op = op;
-            this.commMsg = commMsg;
+            this.msg = commMsg;
             this.skipRecovery = skipRecovery;
         }
 
-        /**
-         * @return Requested change operation.
-         */
-        private NioOperation operation() {
+        /** {@inheritDoc} */
+        public NioOperation operation() {
             return op;
         }
 
-        /**
-         * @return Message.
-         */
-        private ByteBuffer message() {
+        /** {@inheritDoc} */
+        public Object message() {
             return msg;
         }
 
-        /**
-         * @return Direct message.
-         */
-        private Message directMessage() {
-            return commMsg;
-        }
-
-        /**
-         * @param ses New session instance.
-         */
-        private void resetSession(GridSelectorNioSessionImpl ses) {
-            assert commMsg != null;
+        /** {@inheritDoc} */
+        public void resetSession(GridNioSession ses) {
+            assert msg instanceof Message : msg;
 
-            this.ses = ses;
+            this.ses = (GridSelectorNioSessionImpl)ses;
         }
 
         /**
@@ -2290,10 +2571,8 @@ public class GridNioServer<T> {
             return sockCh;
         }
 
-        /**
-         * @return Session for this change request.
-         */
-        private GridSelectorNioSessionImpl session() {
+        /** {@inheritDoc} */
+        public GridSelectorNioSessionImpl session() {
             return ses;
         }
 
@@ -2311,21 +2590,21 @@ public class GridNioServer<T> {
             return meta;
         }
 
-        /**
-         * Applicable to write futures only. Fails future with corresponding IOException.
-         */
-        private void connectionClosed() {
-            assert op == NioOperation.REQUIRE_WRITE;
-            assert ses != null;
-
-            onDone(new IOException("Failed to send message (connection was closed): " + ses));
+        /** {@inheritDoc} */
+        @Override public void onError(Exception e) {
+            onDone(e);
         }
 
         /** {@inheritDoc} */
         @Override public void onAckReceived() {
-            assert commMsg != null;
+            assert msg instanceof Message : msg;
+
+            ((Message)msg).onAckReceived();
+        }
 
-            commMsg.onAckReceived();
+        /** {@inheritDoc} */
+        @Override public void onMessageWritten() {
+            onDone();
         }
 
         /** {@inheritDoc} */
@@ -2369,7 +2648,7 @@ public class GridNioServer<T> {
         }
 
         /** {@inheritDoc} */
-        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) {
+        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException {
             if (directMode) {
                 boolean sslSys = sslFilter != null && msg instanceof ByteBuffer;
 
@@ -2388,10 +2667,10 @@ public class GridNioServer<T> {
                     return null;
                 }
                 else
-                    return send(ses, (Message)msg);
+                    return send(ses, (Message)msg, fut);
             }
             else
-                return send(ses, (ByteBuffer)msg);
+                return send(ses, (ByteBuffer)msg, fut);
         }
 
         /** {@inheritDoc} */
@@ -2759,4 +3038,14 @@ public class GridNioServer<T> {
             return this;
         }
     }
+
+    /**
+     *
+     */
+    interface SessionChangeRequest {
+        /**
+         * @return Requested change operation.
+         */
+        NioOperation operation();
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
index 1e427d8..c1b60ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.util.nio;
 
 import java.net.InetSocketAddress;
+import org.apache.ignite.IgniteCheckedException;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -105,6 +106,11 @@ public interface GridNioSession {
     public GridNioFuture<?> send(Object msg);
 
     /**
+     * @param msg Message to be sent.
+     */
+    public void sendNoFuture(Object msg) throws IgniteCheckedException;
+
+    /**
      * Gets metadata associated with specified key.
      *
      * @param key Key to look up.
@@ -174,4 +180,9 @@ public interface GridNioSession {
      * @return Recovery descriptor if recovery is supported, {@code null otherwise.}
      */
     @Nullable public GridNioRecoveryDescriptor inRecoveryDescriptor();
+
+    /**
+     * @param msg System message to send.
+     */
+    public void systemMessage(Object msg);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
index 53a624d..594bca2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
@@ -99,7 +99,7 @@ public class GridNioSessionImpl implements GridNioSession {
         try {
             resetSendScheduleTime();
 
-            return chain().onSessionWrite(this, msg);
+            return chain().onSessionWrite(this, msg, true);
         }
         catch (IgniteCheckedException e) {
             close();
@@ -109,6 +109,18 @@ public class GridNioSessionImpl implements GridNioSession {
     }
 
     /** {@inheritDoc} */
+    @Override public void sendNoFuture(Object msg) throws IgniteCheckedException {
+        try {
+            chain().onSessionWrite(this, msg, false);
+        }
+        catch (IgniteCheckedException e) {
+            close();
+
+            throw e;
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public GridNioFuture<?> resumeReads() {
         try {
             return chain().onResumeReads(this);
@@ -316,6 +328,11 @@ public class GridNioSessionImpl implements GridNioSession {
     }
 
     /** {@inheritDoc} */
+    @Override public void systemMessage(Object msg) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridNioSessionImpl.class, this);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index 6b00281..245d5b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -37,7 +37,7 @@ import org.jsr166.ConcurrentLinkedDeque8;
  */
 class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     /** Pending write requests. */
-    private final ConcurrentLinkedDeque8<GridNioFuture<?>> queue = new ConcurrentLinkedDeque8<>();
+    private final ConcurrentLinkedDeque8<SessionWriteRequest> queue = new ConcurrentLinkedDeque8<>();
 
     /** Selection key associated with this session. */
     @GridToStringExclude
@@ -68,6 +68,9 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     /** */
     final AtomicBoolean procWrite = new AtomicBoolean();
 
+    /** */
+    private Object sysMsg;
+
     /**
      * Creates session instance.
      *
@@ -166,7 +169,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
      * @param writeFut Write request.
      * @return Updated size of the queue.
      */
-    int offerSystemFuture(GridNioFuture<?> writeFut) {
+    int offerSystemFuture(SessionWriteRequest writeFut) {
         writeFut.messageThread(true);
 
         boolean res = queue.offerFirst(writeFut);
@@ -186,7 +189,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
      * @param writeFut Write request to add.
      * @return Updated size of the queue.
      */
-    int offerFuture(GridNioFuture<?> writeFut) {
+    int offerFuture(SessionWriteRequest writeFut) {
         boolean msgThread = GridNioBackPressureControl.threadProcessingMessage();
 
         if (sem != null && !msgThread)
@@ -204,7 +207,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     /**
      * @param futs Futures to resend.
      */
-    void resend(Collection<GridNioFuture<?>> futs) {
+    void resend(Collection<SessionWriteRequest> futs) {
         assert queue.isEmpty() : queue.size();
 
         boolean add = queue.addAll(futs);
@@ -215,8 +218,8 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     /**
      * @return Message that is in the head of the queue, {@code null} if queue is empty.
      */
-    @Nullable GridNioFuture<?> pollFuture() {
-        GridNioFuture<?> last = queue.poll();
+    @Nullable SessionWriteRequest pollFuture() {
+        SessionWriteRequest last = queue.poll();
 
         if (last != null) {
             if (sem != null && !last.messageThread())
@@ -246,7 +249,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
      * @param fut Future.
      * @return {@code True} if future was removed from queue.
      */
-    boolean removeFuture(GridNioFuture<?> fut) {
+    boolean removeFuture(SessionWriteRequest fut) {
         assert closed();
 
         return queue.removeLastOccurrence(fut);
@@ -264,7 +267,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     /**
      * @return Write requests.
      */
-    Collection<GridNioFuture<?>> writeQueue() {
+    Collection<SessionWriteRequest> writeQueue() {
         return queue;
     }
 
@@ -321,6 +324,31 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     }
 
     /** {@inheritDoc} */
+    @Override public void systemMessage(Object sysMsg) {
+        this.sysMsg = sysMsg;
+    }
+
+    /**
+     * @return {@code True} if have pending system message to send.
+     */
+    boolean hasSystemMessage() {
+        return sysMsg != null;
+    }
+
+    /**
+     * Gets and clears pending system message.
+     *
+     * @return Pending system message.
+     */
+    Object systemMessage() {
+        Object ret = sysMsg;
+
+        sysMsg = null;
+
+        return ret;
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridSelectorNioSessionImpl.class, this, super.toString());
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
index fcb40c7..c98c3aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
@@ -108,40 +108,36 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie
     }
 
     /** {@inheritDoc} */
-    @Override public boolean sendMessage(@Nullable UUID nodeId, Message msg, IgniteInClosure<IgniteException> closure)
+    @Override public boolean sendMessage(@Nullable UUID nodeId, Message msg, IgniteInClosure<IgniteException> c)
         throws IgniteCheckedException {
-        // Node ID is never provided in asynchronous send mode.
-        assert nodeId == null;
+        try {
+            // Node ID is never provided in asynchronous send mode.
+            assert nodeId == null;
 
-        if (closure != null)
-            ses.addMeta(ACK_CLOSURE.ordinal(), closure);
+            if (c != null)
+                ses.addMeta(ACK_CLOSURE.ordinal(), c);
 
-        GridNioFuture<?> fut = ses.send(msg);
+            ses.sendNoFuture(msg);
 
-        if (fut.isDone()) {
-            try {
-                fut.get();
-            }
-            catch (IgniteCheckedException e) {
-                if (closure != null)
-                    ses.removeMeta(ACK_CLOSURE.ordinal());
+            if (c != null)
+                ses.removeMeta(ACK_CLOSURE.ordinal());
+        }
+        catch (IgniteCheckedException e) {
+            if (c != null)
+                ses.removeMeta(ACK_CLOSURE.ordinal());
 
-                if (log.isDebugEnabled())
-                    log.debug("Failed to send message [client=" + this + ", err=" + e + ']');
+            if (log.isDebugEnabled())
+                log.debug("Failed to send message [client=" + this + ", err=" + e + ']');
 
-                if (e.getCause() instanceof IOException) {
-                    ses.close();
+            if (e.getCause() instanceof IOException) {
+                ses.close();
 
-                    return true;
-                }
-                else
-                    throw new IgniteCheckedException("Failed to send message [client=" + this + ']', e);
+                return true;
             }
+            else
+                throw new IgniteCheckedException("Failed to send message [client=" + this + ']', e);
         }
 
-        if (closure != null)
-            ses.removeMeta(ACK_CLOSURE.ordinal());
-
         return false;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java
new file mode 100644
index 0000000..508c791
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.nio;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.lang.IgniteInClosure;
+
+/**
+ *
+ */
+public interface SessionWriteRequest {
+    /**
+     * Sets flag indicating that message send future was created in thread that was processing a message.
+     *
+     * @param msgThread {@code True} if future was created in thread that is processing message.
+     */
+    public void messageThread(boolean msgThread);
+
+    /**
+     * @return {@code True} if future was created in thread that was processing message.
+     */
+    public boolean messageThread();
+
+    /**
+     * @return {@code True} if skip recovery for this operation.
+     */
+    public boolean skipRecovery();
+
+    /**
+     * Sets ack closure which will be applied when ack received.
+     *
+     * @param c Ack closure.
+     */
+    public void ackClosure(IgniteInClosure<IgniteException> c);
+
+    /**
+     * The method will be called when ack received.
+     */
+    public void onAckReceived();
+
+    /**
+     * @return Ack closure.
+     */
+    public IgniteInClosure<IgniteException> ackClosure();
+
+    /**
+     * @return Session.
+     */
+    public GridNioSession session();
+
+    /**
+     * @param ses Session.
+     */
+    public void resetSession(GridNioSession ses);
+
+    /**
+     *
+     */
+    public void onError(Exception e);
+
+    /**
+     * @return Message.
+     */
+    public Object message();
+
+    /**
+     *
+     */
+    public void onMessageWritten();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
index 63cdd83..2da8f92 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
@@ -249,9 +249,9 @@ public class GridNioSslFilter extends GridNioFilterAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException {
+    @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException {
         if (directMode)
-            return proceedSessionWrite(ses, msg);
+            return proceedSessionWrite(ses, msg, fut);
 
         ByteBuffer input = checkMessage(ses, msg);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java
index 3272b8e..d65d576 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java
@@ -430,7 +430,7 @@ class GridNioSslHandler extends ReentrantLock {
         while (!deferredWriteQueue.isEmpty()) {
             WriteRequest req = deferredWriteQueue.poll();
 
-            req.future().onDone((GridNioFuture<Object>)parent.proceedSessionWrite(ses, req.buffer()));
+            req.future().onDone((GridNioFuture<Object>)parent.proceedSessionWrite(ses, req.buffer(), true));
         }
     }
 
@@ -475,7 +475,7 @@ class GridNioSslHandler extends ReentrantLock {
 
         ByteBuffer cp = copy(outNetBuf);
 
-        return parent.proceedSessionWrite(ses, cp);
+        return parent.proceedSessionWrite(ses, cp, true);
     }
 
     /**


Mime
View raw message