ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [36/50] [abbrv] ignite git commit: io - send future opts
Date Mon, 10 Oct 2016 14:57:41 GMT
io - send future opts


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/929d4a9f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/929d4a9f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/929d4a9f

Branch: refs/heads/ignite-gg-8-io2-park
Commit: 929d4a9f8fe39b2ea7262f7779af32a2389b6eb2
Parents: dc6bae8
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Oct 3 15:18:31 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Oct 3 15:50:04 2016 +0300

----------------------------------------------------------------------
 .../internal/util/ipc/IpcToNioAdapter.java      |   2 +-
 .../nio/GridConnectionBytesVerifyFilter.java    |   4 +-
 .../util/nio/GridNioAsyncNotifyFilter.java      |   4 +-
 .../internal/util/nio/GridNioCodecFilter.java   |   6 +-
 .../ignite/internal/util/nio/GridNioFilter.java |   4 +-
 .../internal/util/nio/GridNioFilterAdapter.java |   4 +-
 .../internal/util/nio/GridNioFilterChain.java   |   8 +-
 .../ignite/internal/util/nio/GridNioFuture.java |   4 +-
 .../util/nio/GridNioRecoveryDescriptor.java     |  35 +-
 .../ignite/internal/util/nio/GridNioServer.java | 416 ++++++++++++++-----
 .../internal/util/nio/GridNioSession.java       |   6 +
 .../internal/util/nio/GridNioSessionImpl.java   |  14 +-
 .../util/nio/GridSelectorNioSessionImpl.java    |  16 +-
 .../util/nio/GridTcpNioCommunicationClient.java |  44 +-
 .../internal/util/nio/SessionWriteRequest.java  |  85 ++++
 .../internal/util/nio/ssl/GridNioSslFilter.java |   4 +-
 .../util/nio/ssl/GridNioSslHandler.java         |   4 +-
 .../communication/tcp/TcpCommunicationSpi.java  |   7 +-
 .../nio/impl/GridNioFilterChainSelfTest.java    |  13 +-
 19 files changed, 497 insertions(+), 183 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/929d4a9f/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
index 6820dc7..d108b56 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
@@ -201,7 +201,7 @@ public class IpcToNioAdapter<T> {
         }
 
         /** {@inheritDoc} */
-        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) {
+        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) {
             assert ses == IpcToNioAdapter.this.ses;
 
             return send((Message)msg);

http://git-wip-us.apache.org/repos/asf/ignite/blob/929d4a9f/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java
index 13d7ca7..02a637d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java
@@ -67,8 +67,8 @@ public class GridConnectionBytesVerifyFilter extends GridNioFilterAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException {
-        return proceedSessionWrite(ses, msg);
+    @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException {
+        return proceedSessionWrite(ses, msg, fut);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/929d4a9f/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java
index 9925d2e..8aa113a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java
@@ -107,8 +107,8 @@ public class GridNioAsyncNotifyFilter extends GridNioFilterAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException {
-        return proceedSessionWrite(ses, msg);
+    @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException {
+        return proceedSessionWrite(ses, msg, fut);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/929d4a9f/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java
index a2f543d..f7a3022 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java
@@ -76,15 +76,15 @@ public class GridNioCodecFilter extends GridNioFilterAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException {
+    @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException {
         // No encoding needed in direct mode.
         if (directMode)
-            return proceedSessionWrite(ses, msg);
+            return proceedSessionWrite(ses, msg, fut);
 
         try {
             ByteBuffer res = parser.encode(ses, msg);
 
-            return proceedSessionWrite(ses, res);
+            return proceedSessionWrite(ses, res, fut);
         }
         catch (IOException e) {
             throw new GridNioException(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/929d4a9f/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java
index 5f88b1f..e749664 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java
@@ -108,7 +108,7 @@ public interface GridNioFilter {
      * @return Write future.
      * @throws IgniteCheckedException If filter is not in chain or GridNioException occurred in the underlying filter.
      */
-    public GridNioFuture<?> proceedSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException;
+    public GridNioFuture<?> proceedSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException;
 
     /**
      * Forwards session close request to the next logical filter in filter chain.
@@ -152,7 +152,7 @@ public interface GridNioFilter {
      * @return Write future.
      * @throws GridNioException If GridNioException occurred while handling event.
      */
-    public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException;
+    public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException;
 
     /**
      * Invoked when a new messages received.

http://git-wip-us.apache.org/repos/asf/ignite/blob/929d4a9f/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java
index 18ab1b2..0a825bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java
@@ -108,10 +108,10 @@ public abstract class GridNioFilterAdapter implements GridNioFilter {
     }
 
     /** {@inheritDoc} */
-    @Override public GridNioFuture<?> proceedSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException {
+    @Override public GridNioFuture<?> proceedSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException {
         checkNext();
 
-        return nextFilter.onSessionWrite(ses, msg);
+        return nextFilter.onSessionWrite(ses, msg, fut);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/929d4a9f/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java
index 8a43e29..6415b46 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java
@@ -181,8 +181,8 @@ public class GridNioFilterChain<T> extends GridNioFilterAdapter {
      * @return Send future.
      * @throws IgniteCheckedException If IgniteCheckedException occurred while handling event.
      */
-    @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException {
-        return tail.onSessionWrite(ses, msg);
+    @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException {
+        return tail.onSessionWrite(ses, msg, fut);
     }
 
     /**
@@ -255,9 +255,9 @@ public class GridNioFilterChain<T> extends GridNioFilterAdapter {
         }
 
         /** {@inheritDoc} */
-        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg)
+        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut)
             throws IgniteCheckedException {
-            return proceedSessionWrite(ses, msg);
+            return proceedSessionWrite(ses, msg, fut);
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/929d4a9f/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
index b02acc8..6c0c9c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
@@ -45,9 +45,9 @@ public interface GridNioFuture<R> extends IgniteInternalFuture<R> {
     /**
      * Sets ack closure which will be applied when ack received.
      *
-     * @param closure Ack closure.
+     * @param c Ack closure.
      */
-    public void ackClosure(IgniteInClosure<IgniteException> closure);
+    public void ackClosure(IgniteInClosure<IgniteException> c);
 
     /**
      * The method will be called when ack received.

http://git-wip-us.apache.org/repos/asf/ignite/blob/929d4a9f/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
index 4598eef..44d67c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
@@ -37,7 +37,7 @@ public class GridNioRecoveryDescriptor {
     private long acked;
 
     /** Unacknowledged message futures. */
-    private final ArrayDeque<GridNioFuture<?>> msgFuts;
+    private final ArrayDeque<SessionWriteRequest> msgFuts;
 
     /** Number of messages to resend. */
     private int resendCnt;
@@ -155,15 +155,15 @@ public class GridNioRecoveryDescriptor {
     }
 
     /**
-     * @param fut NIO future.
+     * @param req Write request.
      * @return {@code False} if queue limit is exceeded.
      */
-    public boolean add(GridNioFuture<?> fut) {
-        assert fut != null;
+    public boolean add(SessionWriteRequest req) {
+        assert req != null;
 
-        if (!fut.skipRecovery()) {
+        if (!req.skipRecovery()) {
             if (resendCnt == 0) {
-                msgFuts.addLast(fut);
+                msgFuts.addLast(req);
 
                 sentCnt++;
 
@@ -185,14 +185,12 @@ public class GridNioRecoveryDescriptor {
                 ", msgFuts=" + msgFuts.size() + ']');
 
         while (acked < rcvCnt) {
-            GridNioFuture<?> fut = msgFuts.pollFirst();
+            SessionWriteRequest fut = msgFuts.pollFirst();
 
             assert fut != null : "Missed message future [rcvCnt=" + rcvCnt +
                 ", acked=" + acked +
                 ", desc=" + this + ']';
 
-            assert fut.isDone() : fut;
-
             if (fut.ackClosure() != null)
                 fut.ackClosure().apply(null);
 
@@ -215,7 +213,7 @@ public class GridNioRecoveryDescriptor {
      * @return {@code False} if descriptor is reserved.
      */
     public boolean onNodeLeft() {
-        GridNioFuture<?>[] futs = null;
+        SessionWriteRequest[] futs = null;
 
         synchronized (this) {
             nodeLeft = true;
@@ -224,7 +222,7 @@ public class GridNioRecoveryDescriptor {
                 return false;
 
             if (!msgFuts.isEmpty()) {
-                futs = msgFuts.toArray(new GridNioFuture<?>[msgFuts.size()]);
+                futs = msgFuts.toArray(new SessionWriteRequest[msgFuts.size()]);
 
                 msgFuts.clear();
             }
@@ -239,7 +237,7 @@ public class GridNioRecoveryDescriptor {
     /**
      * @return Message futures for unacknowledged messages.
      */
-    public Deque<GridNioFuture<?>> messagesFutures() {
+    public Deque<SessionWriteRequest> messagesFutures() {
         return msgFuts;
     }
 
@@ -337,7 +335,7 @@ public class GridNioRecoveryDescriptor {
      *
      */
     public void release() {
-        GridNioFuture<?>[] futs = null;
+        SessionWriteRequest[] futs = null;
 
         synchronized (this) {
             connected = false;
@@ -358,7 +356,7 @@ public class GridNioRecoveryDescriptor {
             }
 
             if (nodeLeft && !msgFuts.isEmpty()) {
-                futs = msgFuts.toArray(new GridNioFuture<?>[msgFuts.size()]);
+                futs = msgFuts.toArray(new SessionWriteRequest[msgFuts.size()]);
 
                 msgFuts.clear();
             }
@@ -428,11 +426,12 @@ public class GridNioRecoveryDescriptor {
     /**
      * @param futs Futures to complete.
      */
-    private void completeOnNodeLeft(GridNioFuture<?>[] futs) {
-        for (GridNioFuture<?> msg : futs) {
-            IOException e = new IOException("Failed to send message, node has left: " + node.id());
+    private void completeOnNodeLeft(SessionWriteRequest[] futs) {
+        IOException e = new IOException("Failed to send message, node has left: " + node.id());
 
-            ((GridNioFutureImpl)msg).onDone(e);
+        for (SessionWriteRequest msg : futs) {
+            if (msg instanceof GridNioFutureImpl)
+                ((GridNioFutureImpl)msg).onDone(e);
 
             if (msg.ackClosure() != null)
                 msg.ackClosure().apply(new IgniteException(e));

http://git-wip-us.apache.org/repos/asf/ignite/blob/929d4a9f/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 97d676a..ccaacf9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -414,16 +414,25 @@ public class GridNioServer<T> {
      * @param msg Message.
      * @return Future for operation.
      */
-    GridNioFuture<?> send(GridNioSession ses, ByteBuffer msg) {
-        assert ses instanceof GridSelectorNioSessionImpl;
+    GridNioFuture<?> send(GridNioSession ses, ByteBuffer msg, boolean createFut) {
+        assert ses instanceof GridSelectorNioSessionImpl : ses;
 
         GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
 
-        NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg);
+        if (createFut) {
+            NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg);
 
-        send0(impl, fut, false);
+            send0(impl, fut, false);
 
-        return fut;
+            return fut;
+        }
+        else {
+            SessionWriteRequest req = new WriteRequestImpl(ses, msg, true);
+
+            send0(impl, req, false);
+
+            return null;
+        }
     }
 
     /**
@@ -431,17 +440,26 @@ public class GridNioServer<T> {
      * @param msg Message.
      * @return Future for operation.
      */
-    GridNioFuture<?> send(GridNioSession ses, Message msg) {
+    GridNioFuture<?> send(GridNioSession ses, Message msg, boolean createFut) {
         assert ses instanceof GridSelectorNioSessionImpl;
 
         GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
 
-        NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg,
-            skipRecoveryPred.apply(msg));
+        if (createFut) {
+            NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg,
+                skipRecoveryPred.apply(msg));
 
-        send0(impl, fut, false);
+            send0(impl, fut, false);
 
-        return fut;
+            return fut;
+        }
+        else {
+            SessionWriteRequest req = new WriteRequestImpl(ses, msg, skipRecoveryPred.apply(msg));
+
+            send0(impl, req, false);
+
+            return null;
+        }
     }
 
     /**
@@ -449,7 +467,7 @@ public class GridNioServer<T> {
      * @param fut Future.
      * @param sys System message flag.
      */
-    private void send0(GridSelectorNioSessionImpl ses, NioOperationFuture<?> fut, boolean sys) {
+    private void send0(GridSelectorNioSessionImpl ses, SessionWriteRequest fut, boolean sys) {
         assert ses != null;
         assert fut != null;
 
@@ -465,7 +483,7 @@ public class GridNioServer<T> {
                 fut.connectionClosed();
         }
         else if (!ses.procWrite.get() && ses.procWrite.compareAndSet(false, true))
-                clientWorkers.get(ses.selectorIndex()).offer(fut);
+            clientWorkers.get(ses.selectorIndex()).offer((SessionChangeRequest)fut);
 
         if (msgQueueLsnr != null)
             msgQueueLsnr.apply(ses, msgCnt);
@@ -476,10 +494,9 @@ public class GridNioServer<T> {
      *
      * @param ses Session.
      * @param msg Message.
-     * @return Future.
      */
-    public GridNioFuture<?> sendSystem(GridNioSession ses, Message msg) {
-        return sendSystem(ses, msg, null);
+    public void sendSystem(GridNioSession ses, Message msg) {
+        sendSystem(ses, msg, null);
     }
 
     /**
@@ -488,27 +505,29 @@ public class GridNioServer<T> {
      * @param ses Session.
      * @param msg Message.
      * @param lsnr Future listener notified from the session thread.
-     * @return Future.
      */
-    public GridNioFuture<?> sendSystem(GridNioSession ses,
+    public void sendSystem(GridNioSession ses,
         Message msg,
         @Nullable IgniteInClosure<? super IgniteInternalFuture<?>> lsnr) {
         assert ses instanceof GridSelectorNioSessionImpl;
 
         GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
 
-        NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg,
-            skipRecoveryPred.apply(msg));
-
         if (lsnr != null) {
+            NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg,
+                skipRecoveryPred.apply(msg));
+
             fut.listen(lsnr);
 
             assert !fut.isDone();
-        }
 
-        send0(impl, fut, true);
+            send0(impl, fut, true);
+        }
+        else {
+            SessionWriteRequest req = new WriteRequestSystemImpl(ses, msg);
 
-        return fut;
+            send0(impl, req, true);
+        }
     }
 
     /**
@@ -520,16 +539,16 @@ public class GridNioServer<T> {
         GridNioRecoveryDescriptor recoveryDesc = ses.outRecoveryDescriptor();
 
         if (recoveryDesc != null && !recoveryDesc.messagesFutures().isEmpty()) {
-            Deque<GridNioFuture<?>> futs = recoveryDesc.messagesFutures();
+            Deque<SessionWriteRequest> futs = recoveryDesc.messagesFutures();
 
             if (log.isDebugEnabled())
                 log.debug("Resend messages [rmtNode=" + recoveryDesc.node().id() + ", msgCnt=" + futs.size() + ']');
 
             GridSelectorNioSessionImpl ses0 = (GridSelectorNioSessionImpl)ses;
 
-            GridNioFuture<?> fut0 = futs.iterator().next();
+            SessionWriteRequest fut0 = futs.iterator().next();
 
-            for (GridNioFuture<?> fut : futs) {
+            for (SessionWriteRequest fut : futs) {
                 fut.messageThread(true);
 
                 ((NioOperationFuture)fut).resetSession(ses0);
@@ -835,13 +854,13 @@ public class GridNioServer<T> {
 
             while (true) {
                 ByteBuffer buf = ses.removeMeta(BUF_META_KEY);
-                NioOperationFuture<?> req = ses.removeMeta(NIO_OPERATION.ordinal());
+                SessionWriteRequest req = ses.removeMeta(NIO_OPERATION.ordinal());
 
                 // Check if there were any pending data from previous writes.
                 if (buf == null) {
                     assert req == null;
 
-                    req = (NioOperationFuture<?>)ses.pollFuture();
+                    req = ses.pollFuture();
 
                     if (req == null) {
                         if (ses.procWrite.get()) {
@@ -858,7 +877,7 @@ public class GridNioServer<T> {
                         break;
                     }
 
-                    buf = req.message();
+                    buf = (ByteBuffer)req.message();
                 }
 
                 if (!skipWrite) {
@@ -893,7 +912,7 @@ public class GridNioServer<T> {
                     // Message was successfully written.
                     assert req != null;
 
-                    req.onDone();
+                    req.onMessageWritten();
                 }
             }
         }
@@ -1051,14 +1070,14 @@ public class GridNioServer<T> {
                 if (ses.meta(WRITE_BUF_LIMIT) != null)
                     buf.limit((int)ses.meta(WRITE_BUF_LIMIT));
 
-                NioOperationFuture<?> req = ses.removeMeta(NIO_OPERATION.ordinal());
+                SessionWriteRequest req = ses.removeMeta(NIO_OPERATION.ordinal());
 
                 while (true) {
                     if (req == null) {
                         req = systemMessage(ses);
 
                         if (req == null) {
-                            req = (NioOperationFuture<?>)ses.pollFuture();
+                            req = ses.pollFuture();
 
                             if (req == null && buf.position() == 0) {
                                 if (ses.procWrite.get()) {
@@ -1081,7 +1100,7 @@ public class GridNioServer<T> {
                     boolean finished = false;
 
                     if (req != null) {
-                        msg = req.directMessage();
+                        msg = (Message)req.message();
 
                         assert msg != null;
 
@@ -1096,17 +1115,17 @@ public class GridNioServer<T> {
 
                     // Fill up as many messages as possible to write buffer.
                     while (finished) {
-                        req.onDone();
+                        req.onMessageWritten();
 
                         req = systemMessage(ses);
 
                         if (req == null)
-                            req = (NioOperationFuture<?>)ses.pollFuture();
+                            req = ses.pollFuture();
 
                         if (req == null)
                             break;
 
-                        msg = req.directMessage();
+                        msg = (Message)req.message();
 
                         assert msg != null;
 
@@ -1213,14 +1232,11 @@ public class GridNioServer<T> {
          * @param ses Session.
          * @return System message request.
          */
-        private NioOperationFuture<?> systemMessage(GridSelectorNioSessionImpl ses) {
+        private SessionWriteRequest systemMessage(GridSelectorNioSessionImpl ses) {
             if (ses.hasSystemMessage()) {
                 Object msg = ses.systemMessage();
 
-                NioOperationFuture req = new NioOperationFuture<>(ses,
-                    NioOperation.REQUIRE_WRITE,
-                    (Message)msg,
-                    true);
+                SessionWriteRequest req = new WriteRequestSystemImpl(ses, msg);
 
                 assert !ses.hasSystemMessage();
 
@@ -1242,7 +1258,7 @@ public class GridNioServer<T> {
 
             GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
             ByteBuffer buf = ses.writeBuffer();
-            NioOperationFuture<?> req = ses.removeMeta(NIO_OPERATION.ordinal());
+            SessionWriteRequest req = ses.removeMeta(NIO_OPERATION.ordinal());
 
             MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
 
@@ -1259,7 +1275,7 @@ public class GridNioServer<T> {
                 req = systemMessage(ses);
 
                 if (req == null) {
-                    req = (NioOperationFuture<?>)ses.pollFuture();
+                    req = ses.pollFuture();
 
                     if (req == null && buf.position() == 0) {
                         if (ses.procWrite.get()) {
@@ -1282,9 +1298,9 @@ public class GridNioServer<T> {
             boolean finished = false;
 
             if (req != null) {
-                msg = req.directMessage();
+                msg = (Message)req.message();
 
-                assert msg != null;
+                assert msg != null : req;
 
                 if (writer != null)
                     writer.setCurrentWriteClass(msg.getClass());
@@ -1297,17 +1313,17 @@ public class GridNioServer<T> {
 
             // Fill up as many messages as possible to write buffer.
             while (finished) {
-                req.onDone();
+                req.onMessageWritten();
 
                 req = systemMessage(ses);
 
                 if (req == null)
-                    req = (NioOperationFuture<?>)ses.pollFuture();
+                    req = ses.pollFuture();
 
                 if (req == null)
                     break;
 
-                msg = req.directMessage();
+                msg = (Message)req.message();
 
                 assert msg != null;
 
@@ -1360,7 +1376,7 @@ public class GridNioServer<T> {
      */
     private abstract class AbstractNioClientWorker extends GridWorker {
         /** Queue of change requests on this selector. */
-        private final ConcurrentLinkedQueue<NioOperationFuture> changeReqs = new ConcurrentLinkedQueue<>();
+        private final ConcurrentLinkedQueue<SessionChangeRequest> changeReqs = new ConcurrentLinkedQueue<>();
 
         /** Selector to select read events. */
         private Selector selector;
@@ -1468,7 +1484,7 @@ public class GridNioServer<T> {
          *
          * @param req Change request.
          */
-        private void offer(NioOperationFuture req) {
+        private void offer(SessionChangeRequest req) {
             changeReqs.offer(req);
 
             selector.wakeup();
@@ -1485,23 +1501,27 @@ public class GridNioServer<T> {
                 long lastIdleCheck = U.currentTimeMillis();
 
                 while (!closed && selector.isOpen()) {
-                    NioOperationFuture req;
+                    SessionChangeRequest req0;
 
-                    while ((req = changeReqs.poll()) != null) {
-                        switch (req.operation()) {
+                    while ((req0 = changeReqs.poll()) != null) {
+                        switch (req0.operation()) {
                             case REGISTER: {
-                                register(req);
+                                register((NioOperationFuture)req0);
 
                                 break;
                             }
 
                             case REQUIRE_WRITE: {
-                                registerWrite(req.session());
+                                SessionWriteRequest req = (SessionWriteRequest)req0;
+
+                                registerWrite((GridSelectorNioSessionImpl)req.session());
 
                                 break;
                             }
 
                             case CLOSE: {
+                                NioOperationFuture req = (NioOperationFuture)req0;
+
                                 if (close(req.session(), null))
                                     req.onDone(true);
                                 else
@@ -1511,6 +1531,8 @@ public class GridNioServer<T> {
                             }
 
                             case PAUSE_READ: {
+                                NioOperationFuture req = (NioOperationFuture)req0;
+
                                 SelectionKey key = req.session().key();
 
                                 if (key.isValid()) {
@@ -1529,6 +1551,8 @@ public class GridNioServer<T> {
                             }
 
                             case RESUME_READ: {
+                                NioOperationFuture req = (NioOperationFuture)req0;
+
                                 SelectionKey key = req.session().key();
 
                                 if (key.isValid()) {
@@ -1547,10 +1571,15 @@ public class GridNioServer<T> {
                             }
 
                             case DUMP_STATS: {
-                                dumpStats();
+                                NioOperationFuture req = (NioOperationFuture)req0;
 
-                                // Complete the request just in case (none should wait on this future).
-                                req.onDone(true);
+                                try {
+                                    dumpStats();
+                                }
+                                finally {
+                                    // Complete the request just in case (none should wait on this future).
+                                    req.onDone(true);
+                                }
                             }
                         }
                     }
@@ -1668,7 +1697,7 @@ public class GridNioServer<T> {
 
                 int cnt = 0;
 
-                for (GridNioFuture<?> fut : ses.writeQueue()) {
+                for (SessionWriteRequest fut : ses.writeQueue()) {
                     if (cnt == 0)
                         sb.append(",\n opQueue=[").append(fut);
                     else
@@ -1959,7 +1988,7 @@ public class GridNioServer<T> {
                 ses.removeMeta(BUF_META_KEY);
 
                 // Since ses is in closed state, no write requests will be added.
-                NioOperationFuture<?> fut = ses.removeMeta(NIO_OPERATION.ordinal());
+                SessionWriteRequest fut = ses.removeMeta(NIO_OPERATION.ordinal());
 
                 GridNioRecoveryDescriptor outRecovery = ses.outRecoveryDescriptor();
                 GridNioRecoveryDescriptor inRecovery = ses.inRecoveryDescriptor();
@@ -1967,7 +1996,7 @@ public class GridNioServer<T> {
                 if (outRecovery != null || inRecovery != null) {
                     try {
                         // Poll will update recovery data.
-                        while ((fut = (NioOperationFuture<?>)ses.pollFuture()) != null) {
+                        while ((fut = ses.pollFuture()) != null) {
                             if (fut.skipRecovery())
                                 fut.connectionClosed();
                         }
@@ -1984,7 +2013,7 @@ public class GridNioServer<T> {
                     if (fut != null)
                         fut.connectionClosed();
 
-                    while ((fut = (NioOperationFuture<?>)ses.pollFuture()) != null)
+                    while ((fut = ses.pollFuture()) != null)
                         fut.connectionClosed();
                 }
 
@@ -2209,9 +2238,193 @@ public class GridNioServer<T> {
     }
 
     /**
+     *
+     */
+    private static final class WriteRequestSystemImpl implements SessionWriteRequest, SessionChangeRequest {
+        /** */
+        private final Object msg;
+
+        /** */
+        private final GridNioSession ses;
+
+        /**
+         * @param ses Session.
+         * @param msg Message.
+         */
+        WriteRequestSystemImpl(GridNioSession ses, Object msg) {
+            this.ses = ses;
+            this.msg = msg;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void messageThread(boolean msgThread) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean messageThread() {
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean skipRecovery() {
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void ackClosure(IgniteInClosure<IgniteException> c) {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onAckReceived() {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteInClosure<IgniteException> ackClosure() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void connectionClosed() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object message() {
+            return msg;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMessageWritten() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void resetSession(GridNioSession ses) {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridNioSession session() {
+            return ses;
+        }
+
+        /** {@inheritDoc} */
+        @Override public NioOperation operation() {
+            return NioOperation.REQUIRE_WRITE;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(WriteRequestSystemImpl.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    private static final class WriteRequestImpl implements SessionWriteRequest, SessionChangeRequest {
+        /** */
+        private GridNioSession ses;
+
+        /** */
+        private final Object msg;
+
+        /** */
+        private boolean msgThread;
+
+        /** */
+        private final boolean skipRecovery;
+
+        /** */
+        private IgniteInClosure<IgniteException> ackC;
+
+        /**
+         * @param ses Session.
+         * @param msg Message.
+         * @param skipRecovery Skip recovery flag.
+         */
+        WriteRequestImpl(GridNioSession ses, Object msg, boolean skipRecovery) {
+            this.ses = ses;
+            this.msg = msg;
+            this.skipRecovery = skipRecovery;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void messageThread(boolean msgThread) {
+            this.msgThread = msgThread;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean messageThread() {
+            return msgThread;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean skipRecovery() {
+            return skipRecovery;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void ackClosure(IgniteInClosure<IgniteException> c) {
+            ackC = c;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onAckReceived() {
+            assert msg instanceof Message;
+
+            ((Message)msg).onAckReceived();
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteInClosure<IgniteException> ackClosure() {
+            return ackC;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void connectionClosed() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object message() {
+            return msg;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMessageWritten() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void resetSession(GridNioSession ses) {
+            this.ses = ses;
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridNioSession session() {
+            return ses;
+        }
+
+        /** {@inheritDoc} */
+        @Override public NioOperation operation() {
+            return NioOperation.REQUIRE_WRITE;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(WriteRequestImpl.class, this);
+        }
+    }
+
+    /**
      * Class for requesting write and session close operations.
      */
-    private static class NioOperationFuture<R> extends GridNioFutureImpl<R> {
+    private static class NioOperationFuture<R> extends GridNioFutureImpl<R> implements SessionWriteRequest,
+        SessionChangeRequest {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -2227,11 +2440,7 @@ public class GridNioServer<T> {
         private NioOperation op;
 
         /** Message. */
-        @GridToStringExclude
-        private ByteBuffer msg;
-
-        /** Direct message. */
-        private Message commMsg;
+        private Object msg;
 
         /** */
         @GridToStringExclude
@@ -2293,8 +2502,7 @@ public class GridNioServer<T> {
          * @param op Requested operation.
          * @param msg Message.
          */
-        NioOperationFuture(GridSelectorNioSessionImpl ses, NioOperation op,
-            ByteBuffer msg) {
+        NioOperationFuture(GridSelectorNioSessionImpl ses, NioOperation op, Object msg) {
             assert ses != null;
             assert op != null;
             assert op != NioOperation.REGISTER;
@@ -2322,38 +2530,25 @@ public class GridNioServer<T> {
 
             this.ses = ses;
             this.op = op;
-            this.commMsg = commMsg;
+            this.msg = commMsg;
             this.skipRecovery = skipRecovery;
         }
 
-        /**
-         * @return Requested change operation.
-         */
-        private NioOperation operation() {
+        /** {@inheritDoc} */
+        public NioOperation operation() {
             return op;
         }
 
-        /**
-         * @return Message.
-         */
-        private ByteBuffer message() {
+        /** {@inheritDoc} */
+        public Object message() {
             return msg;
         }
 
-        /**
-         * @return Direct message.
-         */
-        private Message directMessage() {
-            return commMsg;
-        }
-
-        /**
-         * @param ses New session instance.
-         */
-        private void resetSession(GridSelectorNioSessionImpl ses) {
-            assert commMsg != null;
+        /** {@inheritDoc} */
+        public void resetSession(GridNioSession ses) {
+            assert msg instanceof Message : msg;
 
-            this.ses = ses;
+            this.ses = (GridSelectorNioSessionImpl)ses;
         }
 
         /**
@@ -2363,10 +2558,8 @@ public class GridNioServer<T> {
             return sockCh;
         }
 
-        /**
-         * @return Session for this change request.
-         */
-        private GridSelectorNioSessionImpl session() {
+        /** {@inheritDoc} */
+        public GridSelectorNioSessionImpl session() {
             return ses;
         }
 
@@ -2384,10 +2577,8 @@ public class GridNioServer<T> {
             return meta;
         }
 
-        /**
-         * Applicable to write futures only. Fails future with corresponding IOException.
-         */
-        private void connectionClosed() {
+        /** {@inheritDoc} */
+        public void connectionClosed() {
             assert op == NioOperation.REQUIRE_WRITE;
             assert ses != null;
 
@@ -2396,9 +2587,14 @@ public class GridNioServer<T> {
 
         /** {@inheritDoc} */
         @Override public void onAckReceived() {
-            assert commMsg != null;
+            assert msg instanceof Message : msg;
+
+            ((Message)msg).onAckReceived();
+        }
 
-            commMsg.onAckReceived();
+        /** {@inheritDoc} */
+        @Override public void onMessageWritten() {
+            onDone();
         }
 
         /** {@inheritDoc} */
@@ -2442,7 +2638,7 @@ public class GridNioServer<T> {
         }
 
         /** {@inheritDoc} */
-        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) {
+        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) {
             if (directMode) {
                 boolean sslSys = sslFilter != null && msg instanceof ByteBuffer;
 
@@ -2461,10 +2657,10 @@ public class GridNioServer<T> {
                     return null;
                 }
                 else
-                    return send(ses, (Message)msg);
+                    return send(ses, (Message)msg, fut);
             }
             else
-                return send(ses, (ByteBuffer)msg);
+                return send(ses, (ByteBuffer)msg, fut);
         }
 
         /** {@inheritDoc} */
@@ -2832,4 +3028,14 @@ public class GridNioServer<T> {
             return this;
         }
     }
+
+    /**
+     *
+     */
+    interface SessionChangeRequest {
+        /**
+         * @return Requested change operation.
+         */
+        NioOperation operation();
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/929d4a9f/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
index bd60e52..f1981b3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.util.nio;
 
 import java.net.InetSocketAddress;
+import org.apache.ignite.IgniteCheckedException;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -105,6 +106,11 @@ public interface GridNioSession {
     public GridNioFuture<?> send(Object msg);
 
     /**
+     * @param msg Message to be sent.
+     */
+    public void sendNoFuture(Object msg) throws IgniteCheckedException;
+
+    /**
      * Gets metadata associated with specified key.
      *
      * @param key Key to look up.

http://git-wip-us.apache.org/repos/asf/ignite/blob/929d4a9f/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
index 04c47b9..594bca2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
@@ -99,7 +99,7 @@ public class GridNioSessionImpl implements GridNioSession {
         try {
             resetSendScheduleTime();
 
-            return chain().onSessionWrite(this, msg);
+            return chain().onSessionWrite(this, msg, true);
         }
         catch (IgniteCheckedException e) {
             close();
@@ -109,6 +109,18 @@ public class GridNioSessionImpl implements GridNioSession {
     }
 
     /** {@inheritDoc} */
+    @Override public void sendNoFuture(Object msg) throws IgniteCheckedException {
+        try {
+            chain().onSessionWrite(this, msg, false);
+        }
+        catch (IgniteCheckedException e) {
+            close();
+
+            throw e;
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public GridNioFuture<?> resumeReads() {
         try {
             return chain().onResumeReads(this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/929d4a9f/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index 8c6996c..6dc53af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -37,7 +37,7 @@ import org.jsr166.ConcurrentLinkedDeque8;
  */
 class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     /** Pending write requests. */
-    private final ConcurrentLinkedDeque8<GridNioFuture<?>> queue = new ConcurrentLinkedDeque8<>();
+    private final ConcurrentLinkedDeque8<SessionWriteRequest> queue = new ConcurrentLinkedDeque8<>();
 
     /** Selection key associated with this session. */
     @GridToStringExclude
@@ -169,7 +169,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
      * @param writeFut Write request.
      * @return Updated size of the queue.
      */
-    int offerSystemFuture(GridNioFuture<?> writeFut) {
+    int offerSystemFuture(SessionWriteRequest writeFut) {
         writeFut.messageThread(true);
 
         boolean res = queue.offerFirst(writeFut);
@@ -189,7 +189,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
      * @param writeFut Write request to add.
      * @return Updated size of the queue.
      */
-    int offerFuture(GridNioFuture<?> writeFut) {
+    int offerFuture(SessionWriteRequest writeFut) {
         boolean msgThread = GridNioBackPressureControl.threadProcessingMessage();
 
         if (sem != null && !msgThread)
@@ -207,7 +207,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     /**
      * @param futs Futures to resend.
      */
-    void resend(Collection<GridNioFuture<?>> futs) {
+    void resend(Collection<SessionWriteRequest> futs) {
         assert queue.isEmpty() : queue.size();
 
         boolean add = queue.addAll(futs);
@@ -218,8 +218,8 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     /**
      * @return Message that is in the head of the queue, {@code null} if queue is empty.
      */
-    @Nullable GridNioFuture<?> pollFuture() {
-        GridNioFuture<?> last = queue.poll();
+    @Nullable SessionWriteRequest pollFuture() {
+        SessionWriteRequest last = queue.poll();
 
         if (last != null) {
             if (sem != null && !last.messageThread())
@@ -249,7 +249,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
      * @param fut Future.
      * @return {@code True} if future was removed from queue.
      */
-    boolean removeFuture(GridNioFuture<?> fut) {
+    boolean removeFuture(SessionWriteRequest fut) {
         assert closed();
 
         return queue.removeLastOccurrence(fut);
@@ -267,7 +267,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     /**
      * @return Write requests.
      */
-    Collection<GridNioFuture<?>> writeQueue() {
+    Collection<SessionWriteRequest> writeQueue() {
         return queue;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/929d4a9f/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
index fcb40c7..c98c3aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
@@ -108,40 +108,36 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie
     }
 
     /** {@inheritDoc} */
-    @Override public boolean sendMessage(@Nullable UUID nodeId, Message msg, IgniteInClosure<IgniteException> closure)
+    @Override public boolean sendMessage(@Nullable UUID nodeId, Message msg, IgniteInClosure<IgniteException> c)
         throws IgniteCheckedException {
-        // Node ID is never provided in asynchronous send mode.
-        assert nodeId == null;
+        try {
+            // Node ID is never provided in asynchronous send mode.
+            assert nodeId == null;
 
-        if (closure != null)
-            ses.addMeta(ACK_CLOSURE.ordinal(), closure);
+            if (c != null)
+                ses.addMeta(ACK_CLOSURE.ordinal(), c);
 
-        GridNioFuture<?> fut = ses.send(msg);
+            ses.sendNoFuture(msg);
 
-        if (fut.isDone()) {
-            try {
-                fut.get();
-            }
-            catch (IgniteCheckedException e) {
-                if (closure != null)
-                    ses.removeMeta(ACK_CLOSURE.ordinal());
+            if (c != null)
+                ses.removeMeta(ACK_CLOSURE.ordinal());
+        }
+        catch (IgniteCheckedException e) {
+            if (c != null)
+                ses.removeMeta(ACK_CLOSURE.ordinal());
 
-                if (log.isDebugEnabled())
-                    log.debug("Failed to send message [client=" + this + ", err=" + e + ']');
+            if (log.isDebugEnabled())
+                log.debug("Failed to send message [client=" + this + ", err=" + e + ']');
 
-                if (e.getCause() instanceof IOException) {
-                    ses.close();
+            if (e.getCause() instanceof IOException) {
+                ses.close();
 
-                    return true;
-                }
-                else
-                    throw new IgniteCheckedException("Failed to send message [client=" + this + ']', e);
+                return true;
             }
+            else
+                throw new IgniteCheckedException("Failed to send message [client=" + this + ']', e);
         }
 
-        if (closure != null)
-            ses.removeMeta(ACK_CLOSURE.ordinal());
-
         return false;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/929d4a9f/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java
new file mode 100644
index 0000000..b159e39
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.nio;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.lang.IgniteInClosure;
+
+/**
+ *
+ */
+public interface SessionWriteRequest {
+    /**
+     * Sets flag indicating that message send future was created in thread that was processing a message.
+     *
+     * @param msgThread {@code True} if future was created in thread that is processing message.
+     */
+    public void messageThread(boolean msgThread);
+
+    /**
+     * @return {@code True} if future was created in thread that was processing message.
+     */
+    public boolean messageThread();
+
+    /**
+     * @return {@code True} if skip recovery for this operation.
+     */
+    public boolean skipRecovery();
+
+    /**
+     * Sets ack closure which will be applied when ack received.
+     *
+     * @param c Ack closure.
+     */
+    public void ackClosure(IgniteInClosure<IgniteException> c);
+
+    /**
+     * The method will be called when ack received.
+     */
+    public void onAckReceived();
+
+    /**
+     * @return Ack closure.
+     */
+    public IgniteInClosure<IgniteException> ackClosure();
+
+    /**
+     * @return Session.
+     */
+    public GridNioSession session();
+
+    /**
+     * @param ses Session.
+     */
+    public void resetSession(GridNioSession ses);
+
+    /**
+     *
+     */
+    public void connectionClosed();
+
+    /**
+     * @return Message.
+     */
+    public Object message();
+
+    /**
+     *
+     */
+    public void onMessageWritten();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/929d4a9f/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
index 63cdd83..2da8f92 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
@@ -249,9 +249,9 @@ public class GridNioSslFilter extends GridNioFilterAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException {
+    @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException {
         if (directMode)
-            return proceedSessionWrite(ses, msg);
+            return proceedSessionWrite(ses, msg, fut);
 
         ByteBuffer input = checkMessage(ses, msg);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/929d4a9f/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java
index 3272b8e..d65d576 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java
@@ -430,7 +430,7 @@ class GridNioSslHandler extends ReentrantLock {
         while (!deferredWriteQueue.isEmpty()) {
             WriteRequest req = deferredWriteQueue.poll();
 
-            req.future().onDone((GridNioFuture<Object>)parent.proceedSessionWrite(ses, req.buffer()));
+            req.future().onDone((GridNioFuture<Object>)parent.proceedSessionWrite(ses, req.buffer(), true));
         }
     }
 
@@ -475,7 +475,7 @@ class GridNioSslHandler extends ReentrantLock {
 
         ByteBuffer cp = copy(outNetBuf);
 
-        return parent.proceedSessionWrite(ses, cp);
+        return parent.proceedSessionWrite(ses, cp, true);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/929d4a9f/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index bda2fcb..b5b9d40 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -355,7 +355,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     if (log.isDebugEnabled())
                         log.debug("Sending local node ID to newly accepted session: " + ses);
 
-                    ses.send(nodeIdMessage());
+                    try {
+                        ses.sendNoFuture(nodeIdMessage());
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Failed to send message: " + e, e);
+                    }
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/929d4a9f/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
index ab6ee5c..d403784 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
@@ -114,7 +114,7 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest {
                 proceedExceptionCaught(ses, ex);
             }
 
-            @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) {
+            @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) {
                 sndEvt.compareAndSet(null, ses.<String>meta(MESSAGE_WRITE_META_NAME));
 
                 sndMsgObj.compareAndSet(null, msg);
@@ -155,7 +155,7 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest {
         chain.onSessionIdleTimeout(ses);
         chain.onSessionWriteTimeout(ses);
         assertNull(chain.onSessionClose(ses));
-        assertNull(chain.onSessionWrite(ses, snd));
+        assertNull(chain.onSessionWrite(ses, snd, true));
 
         assertEquals("DCBA", connectedEvt.get());
         assertEquals("DCBA", disconnectedEvt.get());
@@ -210,10 +210,10 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException {
+        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException {
             chainMeta(ses, MESSAGE_WRITE_META_NAME);
 
-            return proceedSessionWrite(ses, msg);
+            return proceedSessionWrite(ses, msg, fut);
         }
 
         /** {@inheritDoc} */
@@ -349,6 +349,11 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
+        @Override public void sendNoFuture(Object msg) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
         @Override public GridNioFuture<Object> resumeReads() {
             return null;
         }


Mime
View raw message