ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [6/7] ignite git commit: IGNITE-3220 I/O bottleneck on server/client cluster configuration Communications optimizations: - possibility to open separate in/out connections - possibility to have multiple connections between nodes - implemented NIO sessions
Date Fri, 09 Dec 2016 09:29:09 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index 1b11688..87d9225 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -611,6 +611,11 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
     }
 
     /** {@inheritDoc} */
+    @Override public int partition() {
+        return partIds != null && !partIds.isEmpty() ? partIds.get(0) : -1;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         writer.setBuffer(buf);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
index 1c1addd..c3e9fbe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
@@ -125,6 +125,11 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
         );
     }
 
+    /** {@inheritDoc} */
+    @Override public int partition() {
+        return partId;
+    }
+
     /**
      * @param key Key to add.
      * @param val Optional update value.

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
index fa7f367..4272a4d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
@@ -244,6 +244,11 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
         return accessTtl;
     }
 
+    /** {@inheritDoc} */
+    @Override public int partition() {
+        return partIds != null && !partIds.isEmpty() ? partIds.get(0) : -1;
+    }
+
     /**
      * @param ctx Cache context.
      * @throws IgniteCheckedException If failed.

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index a419887..bc16ff4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -1585,6 +1585,9 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
      */
     @SuppressWarnings("unchecked")
     protected IgniteInternalFuture asyncOp(final Callable<?> op) {
+        if (!asyncToggled)
+            return ctx.closures().callLocalSafe(op);
+
         IgniteInternalFuture fail = asyncOpAcquire();
 
         if (fail != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index d34047e..eb5e214 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -272,7 +272,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
                 false,
                 null,
                 req.keyValueFilter(),
-                req.partition(),
+                req.partition() == -1 ? null : req.partition(),
                 req.className(),
                 req.clause(),
                 req.includeMetaData(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
index 60c4662..9f965d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
@@ -121,7 +121,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
     private int taskHash;
 
     /** Partition. */
-    private int part;
+    private int part = -1;
 
     /** */
     private AffinityTopologyVersion topVer;
@@ -478,8 +478,8 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
     /**
      * @return partition.
      */
-    @Nullable public Integer partition() {
-        return part == -1 ? null : part;
+    public int partition() {
+        return part;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 6d21dcf..393fb1a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -391,7 +391,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<Void> loadMissing(
         final GridCacheContext cacheCtx,
-        AffinityTopologyVersion topVer,
+        final AffinityTopologyVersion topVer,
         final boolean readThrough,
         boolean async,
         final Collection<KeyCacheObject> keys,
@@ -472,7 +472,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                             CacheObject cacheVal = cacheCtx.toCacheObject(val);
 
                             while (true) {
-                                GridCacheEntryEx entry = cacheCtx.cache().entryEx(key);
+                                GridCacheEntryEx entry = cacheCtx.cache().entryEx(key, topVer);
 
                                 try {
                                     GridCacheVersion setVer = entry.versionedValue(cacheVal, ver, null);
@@ -1507,7 +1507,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
 
                         assert txEntry != null || readCommitted() || skipVals;
 
-                        GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey) : txEntry.cached();
+                        GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey, topVer) : txEntry.cached();
 
                         if (readCommitted() || skipVals) {
                             cacheCtx.evicts().touch(e, topologyVersion());
@@ -1658,7 +1658,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                                             IgniteTxLocalAdapter.this,
                                             /*swap*/cacheCtx.isSwapOrOffheapEnabled(),
                                             /*unmarshal*/true,
-                                            /**update-metrics*/true,
+                                            /*update-metrics*/true,
                                             /*event*/!skipVals,
                                             CU.subjectId(IgniteTxLocalAdapter.this, cctx),
                                             transformClo,

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index 32fda87..fee4dd6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -328,6 +328,8 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
             if (!allowOverwrite)
                 cctx.topology().readLock();
 
+            GridDhtTopologyFuture topWaitFut = null;
+
             try {
                 GridDhtTopologyFuture fut = cctx.topologyVersionFuture();
 
@@ -352,19 +354,25 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
 
                     waitFut = allowOverwrite ? null : cctx.mvcc().addDataStreamerFuture(topVer);
                 }
-                else {
-                    fut.listen(new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> e) {
-                            localUpdate(nodeId, req, updater, topic);
-                        }
-                    });
-                }
+                else
+                    topWaitFut = fut;
             }
             finally {
                 if (!allowOverwrite)
                     cctx.topology().readUnlock();
             }
 
+            if (topWaitFut != null) {
+                // Need call 'listen' after topology read lock is released.
+                topWaitFut.listen(new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                    @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> e) {
+                        localUpdate(nodeId, req, updater, topic);
+                    }
+                });
+
+                return;
+            }
+
             if (job != null) {
                 try {
                     job.call();

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
index 3405b53..4c037b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
@@ -226,7 +226,7 @@ public class IgfsContext {
      */
     public void runInIgfsThreadPool(Runnable r) {
         try {
-            igfsSvc.submit(r);
+            igfsSvc.execute(r);
         }
         catch (RejectedExecutionException ignored) {
             // This exception will happen if network speed is too low and data comes faster
@@ -252,4 +252,4 @@ public class IgfsContext {
 
         return mgr;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index e534800..4490a68 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.igfs;
 
+import java.util.concurrent.Executor;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
@@ -36,6 +37,7 @@ import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadabl
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
@@ -325,6 +327,8 @@ public class IgfsDataManager extends IgfsManager {
         IgniteInternalFuture<byte[]> fut = dataCachePrj.getAsync(key);
 
         if (secReader != null) {
+            Executor exec = igfsCtx.kernalContext().pools().poolForPolicy(GridIoPolicy.IGFS_POOL);
+
             fut = fut.chain(new CX1<IgniteInternalFuture<byte[]>, byte[]>() {
                 @Override public byte[] applyx(IgniteInternalFuture<byte[]> fut) throws IgniteCheckedException {
                     byte[] res = fut.get();
@@ -365,7 +369,7 @@ public class IgfsDataManager extends IgfsManager {
 
                     return res;
                 }
-            });
+            }, exec);
         }
         else
             igfsCtx.metrics().addReadBlocks(1, 0);

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index ab4ee85..6b23e80 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -326,7 +326,7 @@ public final class IgfsImpl implements IgfsEx {
                 // Submit it to the thread pool immediately.
                 assert dualPool != null;
 
-                dualPool.submit(batch);
+                dualPool.execute(batch);
 
                 // Spin in case another batch is currently running.
                 while (true) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
index 9388a8e..7cba9bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
@@ -139,6 +139,7 @@ public class OdbcProcessor extends GridProcessorAdapter {
                             .logger(log)
                             .selectorCount(DFLT_SELECTOR_CNT)
                             .gridName(ctx.gridName())
+                            .serverName("odbc")
                             .tcpNoDelay(DFLT_TCP_NODELAY)
                             .directBuffer(DFLT_TCP_DIRECT_BUF)
                             .byteOrder(ByteOrder.nativeOrder())

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
index fd1c2d4..9d9a4d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.platform.compute;
 
+import java.util.concurrent.Executor;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.binary.BinaryObject;
@@ -410,6 +411,11 @@ public class PlatformCompute extends PlatformAbstractTarget {
         }
 
         /** {@inheritDoc} */
+        @Override public IgniteInternalFuture chain(IgniteClosure doneCb, Executor exec) {
+            throw new UnsupportedOperationException("Chain operation is not supported.");
+        }
+
+        /** {@inheritDoc} */
         @Override public Throwable error() {
             return fut.error();
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java
index b403654..71eca65 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java
@@ -21,7 +21,6 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.rest.GridRestCommand;
 import org.apache.ignite.internal.processors.rest.GridRestProtocolHandler;
@@ -38,8 +37,6 @@ import org.apache.ignite.internal.util.nio.GridNioSession;
 import org.apache.ignite.internal.util.typedef.C2;
 import org.apache.ignite.internal.util.typedef.CIX1;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.processors.rest.GridRestCommand.ATOMIC_DECREMENT;
@@ -72,24 +69,16 @@ public class GridTcpMemcachedNioListener extends GridNioServerListenerAdapter<Gr
     /** Handler. */
     private final GridRestProtocolHandler hnd;
 
-    /** JDK marshaller. */
-    private final Marshaller jdkMarshaller = new JdkMarshaller();
-
-    /** Context. */
-    private final GridKernalContext ctx;
-
     /**
      * Creates listener which will convert incoming tcp packets to rest requests and forward them to
      * a given rest handler.
      *
      * @param log Logger to use.
      * @param hnd Rest handler.
-     * @param ctx Context.
      */
-    public GridTcpMemcachedNioListener(IgniteLogger log, GridRestProtocolHandler hnd, GridKernalContext ctx) {
+    public GridTcpMemcachedNioListener(IgniteLogger log, GridRestProtocolHandler hnd) {
         this.log = log;
         this.hnd = hnd;
-        this.ctx = ctx;
     }
 
     /** {@inheritDoc} */
@@ -462,4 +451,4 @@ public class GridTcpMemcachedNioListener extends GridNioServerListenerAdapter<Gr
 
         return new GridTuple3<>(cmd, quiet, retKey);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java
index 1c1c6dc..3ba6d8e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java
@@ -145,7 +145,7 @@ public class GridTcpRestNioListener extends GridNioServerListenerAdapter<GridCli
      */
     public GridTcpRestNioListener(IgniteLogger log, GridTcpRestProtocol proto, GridRestProtocolHandler hnd,
         GridKernalContext ctx) {
-        memcachedLsnr = new GridTcpMemcachedNioListener(log, hnd, ctx);
+        memcachedLsnr = new GridTcpMemcachedNioListener(log, hnd);
         redisLsnr = new GridRedisNioListener(log, hnd, ctx);
 
         this.log = log;

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java
index 6338fcc..2a002a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java
@@ -257,6 +257,7 @@ public class GridTcpRestProtocol extends GridRestProtocolAdapter {
                 .logger(log)
                 .selectorCount(cfg.getSelectorCount())
                 .gridName(ctx.gridName())
+                .serverName("tcp-rest")
                 .tcpNoDelay(cfg.isNoDelay())
                 .directBuffer(cfg.isDirectBuffer())
                 .byteOrder(ByteOrder.nativeOrder())

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index b9b92b8..9a5f077 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -1131,7 +1131,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
             // Start service in its own thread.
             final ExecutorService exe = svcCtx.executor();
 
-            exe.submit(new Runnable() {
+            exe.execute(new Runnable() {
                 @Override public void run() {
                     try {
                         svc.execute(svcCtx);
@@ -1394,7 +1394,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
                 return;
 
             try {
-                depExe.submit(new BusyRunnable() {
+                depExe.execute(new BusyRunnable() {
                     @Override public void run0() {
                         onSystemCacheUpdated(deps);
                     }
@@ -1587,7 +1587,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
                 else
                     topVer = new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion(), 0);
 
-                depExe.submit(new BusyRunnable() {
+                depExe.execute(new BusyRunnable() {
                     @Override public void run0() {
                         ClusterNode oldest = ctx.discovery().oldestAliveCacheServerNode(topVer);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index e1937bb..3dfb3c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -153,7 +153,6 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
-import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.binary.BinaryRawReader;
 import org.apache.ignite.binary.BinaryRawWriter;
 import org.apache.ignite.cluster.ClusterGroupEmptyException;
@@ -506,10 +505,27 @@ public abstract class IgniteUtils {
         }
     };
 
-    /**
-     * Initializes enterprise check.
+    /** */
+    private static final boolean assertionsEnabled;
+
+    /*
+     *
      */
     static {
+        boolean assertionsEnabled0 = true;
+
+        try {
+            assert false;
+
+            assertionsEnabled0 = false;
+        }
+        catch (AssertionError ignored) {
+            assertionsEnabled0 = true;
+        }
+        finally {
+            assertionsEnabled = assertionsEnabled0;
+        }
+
         String osName = System.getProperty("os.name");
 
         String osLow = osName.toLowerCase();
@@ -1284,6 +1300,27 @@ public abstract class IgniteUtils {
     }
 
     /**
+     * @param threadId Thread ID.
+     * @param sb Builder.
+     */
+    public static void printStackTrace(long threadId, GridStringBuilder sb) {
+        ThreadMXBean mxBean = ManagementFactory.getThreadMXBean();
+
+        ThreadInfo threadInfo = mxBean.getThreadInfo(threadId, Integer.MAX_VALUE);
+
+        printThreadInfo(threadInfo, sb, Collections.<Long>emptySet());
+    }
+
+    /**
+     * @return {@code true} if there is java level deadlock.
+     */
+    public static boolean deadlockPresent() {
+        ThreadMXBean mxBean = ManagementFactory.getThreadMXBean();
+
+        return !F.isEmpty(mxBean.findDeadlockedThreads());
+    }
+
+    /**
      * Prints single thread info to a buffer.
      *
      * @param threadInfo Thread info.
@@ -6141,6 +6178,13 @@ public abstract class IgniteUtils {
     }
 
     /**
+     * @return {@code True} if assertions enabled.
+     */
+    public static boolean assertionsEnabled() {
+        return assertionsEnabled;
+    }
+
+    /**
      * Gets OS JDK string.
      *
      * @return OS JDK string.
@@ -8337,6 +8381,18 @@ public abstract class IgniteUtils {
     }
 
     /**
+     * Gets absolute value for long. If argument is {@link Long#MIN_VALUE}, then {@code 0} is returned.
+     *
+     * @param i Argument.
+     * @return Absolute value.
+     */
+    public static long safeAbs(long i) {
+        i = Math.abs(i);
+
+        return i < 0 ? 0 : i;
+    }
+
+    /**
      * Gets wrapper class for a primitive type.
      *
      * @param cls Class. If {@code null}, method is no-op.

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
new file mode 100644
index 0000000..e9ec74b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -0,0 +1,667 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.LockSupport;
+import org.apache.ignite.IgniteInterruptedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.thread.IgniteThread;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Striped executor.
+ */
+public class StripedExecutor implements ExecutorService {
+    /** Stripes. */
+    private final Stripe[] stripes;
+
+    /** For starvation checks. */
+    private final long[] completedCntrs;
+
+    /** */
+    private final IgniteLogger log;
+
+    /**
+     * Constructor.
+     *
+     * @param cnt Count.
+     */
+    public StripedExecutor(int cnt, String gridName, String poolName, final IgniteLogger log) {
+        A.ensure(cnt > 0, "cnt > 0");
+
+        boolean success = false;
+
+        stripes = new Stripe[cnt];
+
+        completedCntrs = new long[cnt];
+
+        Arrays.fill(completedCntrs, -1);
+
+        this.log = log;
+
+        try {
+            for (int i = 0; i < cnt; i++) {
+                stripes[i] = new StripeConcurrentQueue(
+                    gridName,
+                    poolName,
+                    i,
+                    log);
+
+                stripes[i].start();
+            }
+
+            success = true;
+        }
+        catch (Error | RuntimeException e) {
+            U.error(log, "Failed to initialize striped pool.", e);
+
+            throw e;
+        }
+        finally {
+            if (!success) {
+                for (Stripe stripe : stripes) {
+                    if (stripe != null)
+                        stripe.signalStop();
+                }
+
+                for (Stripe stripe : stripes) {
+                    if (stripe != null)
+                        stripe.awaitStop();
+                }
+            }
+        }
+    }
+
+    /**
+     * Checks starvation in striped pool. Maybe too verbose
+     * but this is needed to faster debug possible issues.
+     */
+    public void checkStarvation() {
+        for (int i = 0; i < stripes.length; i++) {
+            Stripe stripe = stripes[i];
+
+            long completedCnt = stripe.completedCnt;
+
+            boolean active = stripe.active;
+
+            if (completedCntrs[i] != -1 &&
+                completedCntrs[i] == completedCnt &&
+                active) {
+                boolean deadlockPresent = U.deadlockPresent();
+
+                GridStringBuilder sb = new GridStringBuilder();
+
+                sb.a(">>> Possible starvation in striped pool: ")
+                    .a(stripe.thread.getName()).a(U.nl())
+                    .a(stripe.queueToString()).a(U.nl())
+                    .a("deadlock: ").a(deadlockPresent).a(U.nl())
+                    .a("completed: ").a(completedCnt).a(U.nl());
+
+                U.printStackTrace(
+                    stripe.thread.getId(),
+                    sb);
+
+                String msg = sb.toString();
+
+                U.warn(log, msg);
+            }
+
+            if (active || completedCnt > 0)
+                completedCntrs[i] = completedCnt;
+        }
+    }
+
+    /**
+     * @return Stripes count.
+     */
+    public int stripes() {
+        return stripes.length;
+    }
+
+    /**
+     * Execute command.
+     *
+     * @param idx Index.
+     * @param cmd Command.
+     */
+    public void execute(int idx, Runnable cmd) {
+        if (idx == -1)
+            execute(cmd);
+        else {
+            assert idx >= 0 : idx;
+
+            stripes[idx % stripes.length].execute(cmd);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void shutdown() {
+        signalStop();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void execute(@NotNull Runnable cmd) {
+        stripes[ThreadLocalRandom.current().nextInt(stripes.length)].execute(cmd);
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * @return Empty list (always).
+     */
+    @NotNull @Override public List<Runnable> shutdownNow() {
+        signalStop();
+
+        return Collections.emptyList();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean awaitTermination(
+        long timeout,
+        @NotNull TimeUnit unit
+    ) throws InterruptedException {
+        awaitStop();
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isShutdown() {
+        for (Stripe stripe : stripes) {
+            if (stripe != null && stripe.stopping)
+                return true;
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isTerminated() {
+        for (Stripe stripe : stripes) {
+            if (stripe.thread.getState() != Thread.State.TERMINATED)
+                return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * Stops executor.
+     */
+    public void stop() {
+        signalStop();
+
+        awaitStop();
+    }
+
+    /**
+     * Signals all stripes.
+     */
+    private void signalStop() {
+        for (Stripe stripe : stripes)
+            stripe.signalStop();
+    }
+
+    /**
+     * @throws IgniteInterruptedException If interrupted.
+     */
+    private void awaitStop() throws IgniteInterruptedException {
+        for (Stripe stripe : stripes)
+            stripe.awaitStop();
+    }
+
+    /**
+     * @return Return total queue size of all stripes.
+     */
+    public int queueSize() {
+        int size = 0;
+
+        for (Stripe stripe : stripes)
+            size += stripe.queueSize();
+
+        return size;
+    }
+
+    /**
+     * @return Completed tasks count.
+     */
+    public long completedTasks() {
+        long cnt = 0;
+
+        for (Stripe stripe : stripes)
+            cnt += stripe.completedCnt;
+
+        return cnt;
+    }
+
+    /**
+     * Operation not supported.
+     */
+    @NotNull @Override public <T> Future<T> submit(
+        @NotNull Runnable task,
+        T res
+    ) {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Operation not supported.
+     */
+    @NotNull @Override public Future<?> submit(@NotNull Runnable task) {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Operation not supported.
+     */
+    @NotNull @Override public <T> Future<T> submit(@NotNull Callable<T> task) {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Operation not supported.
+     */
+    @NotNull @Override public <T> List<Future<T>> invokeAll(@NotNull Collection<? extends Callable<T>> tasks)
+        throws InterruptedException {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Operation not supported.
+     */
+    @NotNull @Override public <T> List<Future<T>> invokeAll(
+        @NotNull Collection<? extends Callable<T>> tasks,
+        long timeout,
+        @NotNull TimeUnit unit
+    ) throws InterruptedException {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Operation not supported.
+     */
+    @NotNull @Override public <T> T invokeAny(@NotNull Collection<? extends Callable<T>> tasks)
+        throws InterruptedException, ExecutionException {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Operation not supported.
+     */
+    @Override public <T> T invokeAny(
+        @NotNull Collection<? extends Callable<T>> tasks,
+        long timeout,
+        @NotNull TimeUnit unit
+    ) throws InterruptedException, ExecutionException, TimeoutException {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(StripedExecutor.class, this);
+    }
+
+    /**
+     * Stripe.
+     */
+    private static abstract class Stripe implements Runnable {
+        /** */
+        private final String gridName;
+
+        /** */
+        private final String poolName;
+
+        /** */
+        private final int idx;
+
+        /** */
+        private final IgniteLogger log;
+
+        /** Stopping flag. */
+        private volatile boolean stopping;
+
+        /** */
+        private volatile long completedCnt;
+
+        /** */
+        private volatile boolean active;
+
+        /** Thread executing the loop. */
+        protected Thread thread;
+
+        /**
+         * @param gridName Grid name.
+         * @param poolName Pool name.
+         * @param idx Stripe index.
+         * @param log Logger.
+         */
+        public Stripe(
+            String gridName,
+            String poolName,
+            int idx,
+            IgniteLogger log
+        ) {
+            this.gridName = gridName;
+            this.poolName = poolName;
+            this.idx = idx;
+            this.log = log;
+        }
+
+        /**
+         * Starts the stripe.
+         */
+        void start() {
+            thread = new IgniteThread(gridName, poolName + "-stripe-" + idx, this);
+
+            thread.start();
+        }
+
+        /**
+         * Stop the stripe.
+         */
+        void signalStop() {
+            stopping = true;
+
+            U.interrupt(thread);
+        }
+
+        /**
+         * Await thread stop.
+         */
+        void awaitStop() {
+            try {
+                if (thread != null)
+                    thread.join();
+            }
+            catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+
+                throw new IgniteInterruptedException(e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            while (!stopping) {
+                Runnable cmd;
+
+                try {
+                    cmd = take();
+
+                    if (cmd != null) {
+                        active = true;
+
+                        try {
+                            cmd.run();
+                        }
+                        finally {
+                            active = false;
+                            completedCnt++;
+                        }
+                    }
+                }
+                catch (InterruptedException e) {
+                    stopping = true;
+
+                    Thread.currentThread().interrupt();
+
+                    return;
+                }
+                catch (Throwable e) {
+                    U.error(log, "Failed to execute runnable.", e);
+                }
+            }
+        }
+
+        /**
+         * Execute the command.
+         *
+         * @param cmd Command.
+         */
+        abstract void execute(Runnable cmd);
+
+        /**
+         * @return Next runnable.
+         * @throws InterruptedException If interrupted.
+         */
+        abstract Runnable take() throws InterruptedException;
+
+        /**
+         * @return Queue size.
+         */
+        abstract int queueSize();
+
+        /**
+         * @return Stripe's queue to string presentation.
+         */
+        abstract String queueToString();
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Stripe.class, this);
+        }
+    }
+
+    /**
+     * Stripe.
+     */
+    private static class StripeConcurrentQueue extends Stripe {
+        /** Queue. */
+        private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>();
+
+        /** */
+        private volatile boolean parked;
+
+        /**
+         * @param gridName Grid name.
+         * @param poolName Pool name.
+         * @param idx Stripe index.
+         * @param log Logger.
+         */
+        public StripeConcurrentQueue(
+            String gridName,
+            String poolName,
+            int idx,
+            IgniteLogger log
+        ) {
+            super(gridName,
+                poolName,
+                idx,
+                log);
+        }
+
+        /** {@inheritDoc} */
+        @Override Runnable take() throws InterruptedException {
+            Runnable r;
+
+            for (int i = 0; i < 2048; i++) {
+                r = queue.poll();
+
+                if (r != null)
+                    return r;
+            }
+
+            parked = true;
+
+            try {
+                for (;;) {
+                    r = queue.poll();
+
+                    if (r != null)
+                        return r;
+
+                    LockSupport.park();
+
+                    if (Thread.interrupted())
+                        throw new InterruptedException();
+                }
+            }
+            finally {
+                parked = false;
+            }
+        }
+
+        /** {@inheritDoc} */
+        void execute(Runnable cmd) {
+            queue.add(cmd);
+
+            if (parked)
+                LockSupport.unpark(thread);
+        }
+
+        /** {@inheritDoc} */
+        @Override String queueToString() {
+            return String.valueOf(queue);
+        }
+
+        /** {@inheritDoc} */
+        @Override int queueSize() {
+            return queue.size();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(StripeConcurrentQueue.class, this, super.toString());
+        }
+    }
+
+    /**
+     * Stripe.
+     */
+    private static class StripeConcurrentQueueNoPark extends Stripe {
+        /** Queue. */
+        private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>();
+
+        /**
+         * @param gridName Grid name.
+         * @param poolName Pool name.
+         * @param idx Stripe index.
+         * @param log Logger.
+         */
+        public StripeConcurrentQueueNoPark(
+            String gridName,
+            String poolName,
+            int idx,
+            IgniteLogger log
+        ) {
+            super(gridName,
+                poolName,
+                idx,
+                log);
+        }
+
+        /** {@inheritDoc} */
+        @Override Runnable take() {
+            for (;;) {
+                Runnable r = queue.poll();
+
+                if (r != null)
+                    return r;
+            }
+        }
+
+        /** {@inheritDoc} */
+        void execute(Runnable cmd) {
+            queue.add(cmd);
+        }
+
+        /** {@inheritDoc} */
+        @Override int queueSize() {
+            return queue.size();
+        }
+
+        /** {@inheritDoc} */
+        @Override String queueToString() {
+            return String.valueOf(queue);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(StripeConcurrentQueueNoPark.class, this, super.toString());
+        }
+    }
+
+    /**
+     * Stripe.
+     */
+    private static class StripeConcurrentBlockingQueue extends Stripe {
+        /** Queue. */
+        private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
+
+        /**
+         * @param gridName Grid name.
+         * @param poolName Pool name.
+         * @param idx Stripe index.
+         * @param log Logger.
+         */
+        public StripeConcurrentBlockingQueue(
+            String gridName,
+            String poolName,
+            int idx,
+            IgniteLogger log
+        ) {
+            super(gridName,
+                poolName,
+                idx,
+                log);
+        }
+
+        /** {@inheritDoc} */
+        @Override Runnable take() throws InterruptedException {
+            return queue.take();
+        }
+
+        /** {@inheritDoc} */
+        void execute(Runnable cmd) {
+            queue.add(cmd);
+        }
+
+        /** {@inheritDoc} */
+        @Override int queueSize() {
+            return queue.size();
+        }
+
+        /** {@inheritDoc} */
+        @Override String queueToString() {
+            return String.valueOf(queue);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(StripeConcurrentBlockingQueue.class, this, super.toString());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
index 6baedbd..dc63adc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.util.future;
 
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -152,6 +153,29 @@ public class GridFinishedFuture<T> implements IgniteInternalFuture<T> {
     }
 
     /** {@inheritDoc} */
+    @Override public <T1> IgniteInternalFuture<T1> chain(final IgniteClosure<? super IgniteInternalFuture<T>, T1> doneCb, Executor exec) {
+        final GridFutureAdapter<T1> fut = new GridFutureAdapter<>();
+
+        exec.execute(new Runnable() {
+            @Override public void run() {
+                try {
+                    fut.onDone(doneCb.apply(GridFinishedFuture.this));
+                }
+                catch (GridClosureException e) {
+                    fut.onDone(e.unwrap());
+                }
+                catch (RuntimeException | Error e) {
+                    fut.onDone(e);
+
+                    throw e;
+                }
+            }
+        });
+
+        return fut;
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridFinishedFuture.class, this);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
index 2cd534e..c8d85cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.util.future;
 
 import java.util.Arrays;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
 import org.apache.ignite.IgniteCheckedException;
@@ -229,7 +230,13 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
 
     /** {@inheritDoc} */
     @Override public <T> IgniteInternalFuture<T> chain(final IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb) {
-        return new ChainFuture<>(this, doneCb);
+        return new ChainFuture<>(this, doneCb, null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> IgniteInternalFuture<T> chain(final IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb,
+        Executor exec) {
+        return new ChainFuture<>(this, doneCb, exec);
     }
 
     /**
@@ -487,15 +494,17 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
         /**
          * @param fut Future.
          * @param doneCb Closure.
+         * @param cbExec Optional executor to run callback.
          */
         ChainFuture(
             GridFutureAdapter<R> fut,
-            IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb
+            IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb,
+            @Nullable Executor cbExec
         ) {
             this.fut = fut;
             this.doneCb = doneCb;
 
-            fut.listen(new GridFutureChainListener<>(this, doneCb));
+            fut.listen(new GridFutureChainListener<>(this, doneCb, cbExec));
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java
index 947b2ad..367f5d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java
@@ -17,15 +17,17 @@
 
 package org.apache.ignite.internal.util.future;
 
+import java.util.concurrent.Executor;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.lang.GridClosureException;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteInClosure;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Future listener to fill chained future with converted result of the source future.
  */
-public class GridFutureChainListener<T, R> implements IgniteInClosure<IgniteInternalFuture<T>> {
+class GridFutureChainListener<T, R> implements IgniteInClosure<IgniteInternalFuture<T>> {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -35,21 +37,43 @@ public class GridFutureChainListener<T, R> implements IgniteInClosure<IgniteInte
     /** Done callback. */
     private final IgniteClosure<? super IgniteInternalFuture<T>, R> doneCb;
 
+    /** */
+    private Executor cbExec;
+
     /**
      * Constructs chain listener.
+     *
      *  @param fut Target future.
      * @param doneCb Done callback.
+     * @param cbExec Optional executor to run callback.
      */
     public GridFutureChainListener(
         GridFutureAdapter<R> fut,
-        IgniteClosure<? super IgniteInternalFuture<T>, R> doneCb
+        IgniteClosure<? super IgniteInternalFuture<T>, R> doneCb,
+        @Nullable Executor cbExec
     ) {
         this.fut = fut;
         this.doneCb = doneCb;
+        this.cbExec = cbExec;
     }
 
     /** {@inheritDoc} */
-    @Override public void apply(IgniteInternalFuture<T> t) {
+    @Override public void apply(final IgniteInternalFuture<T> t) {
+        if (cbExec != null) {
+            cbExec.execute(new Runnable() {
+                @Override public void run() {
+                    applyCallback(t);
+                }
+            });
+        }
+        else
+            applyCallback(t);
+    }
+
+    /**
+     * @param t Target future.
+     */
+    private void applyCallback(IgniteInternalFuture<T> t) {
         try {
             fut.onDone(doneCb.apply(t));
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
index 6820dc7..d108b56 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
@@ -201,7 +201,7 @@ public class IpcToNioAdapter<T> {
         }
 
         /** {@inheritDoc} */
-        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) {
+        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) {
             assert ses == IpcToNioAdapter.this.ses;
 
             return send((Message)msg);

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
index 9b014ec..f2ab932 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
@@ -35,14 +35,24 @@ public abstract class GridAbstractCommunicationClient implements GridCommunicati
     /** Metrics listener. */
     protected final GridNioMetricsListener metricsLsnr;
 
+    /** */
+    private final int connIdx;
+
     /**
+     * @param connIdx Connection index.
      * @param metricsLsnr Metrics listener.
      */
-    protected GridAbstractCommunicationClient(@Nullable GridNioMetricsListener metricsLsnr) {
+    protected GridAbstractCommunicationClient(int connIdx, @Nullable GridNioMetricsListener metricsLsnr) {
+        this.connIdx = connIdx;
         this.metricsLsnr = metricsLsnr;
     }
 
     /** {@inheritDoc} */
+    @Override public int connectionIndex() {
+        return connIdx;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean close() {
         return reserves.compareAndSet(0, -1);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
index 0de54e9..71b2c24 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
@@ -96,15 +96,20 @@ public interface GridCommunicationClient {
     /**
      * @param nodeId Remote node ID. Provided only for sync clients.
      * @param msg Message to send.
-     * @param closure Ack closure.
+     * @param c Ack closure.
      * @throws IgniteCheckedException If failed.
      * @return {@code True} if should try to resend message.
      */
-    public boolean sendMessage(@Nullable UUID nodeId, Message msg, @Nullable IgniteInClosure<IgniteException> closure)
+    public boolean sendMessage(@Nullable UUID nodeId, Message msg, @Nullable IgniteInClosure<IgniteException> c)
         throws IgniteCheckedException;
 
     /**
      * @return {@code True} if send is asynchronous.
      */
     public boolean async();
+
+    /**
+     * @return Connection index.
+     */
+    public int connectionIndex();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java
index 213fd8d..7987d3d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java
@@ -62,13 +62,20 @@ public class GridConnectionBytesVerifyFilter extends GridNioFilterAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public void onExceptionCaught(GridNioSession ses, IgniteCheckedException ex) throws IgniteCheckedException {
+    @Override public void onExceptionCaught(
+        GridNioSession ses,
+        IgniteCheckedException ex
+    ) throws IgniteCheckedException {
         proceedExceptionCaught(ses, ex);
     }
 
     /** {@inheritDoc} */
-    @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException {
-        return proceedSessionWrite(ses, msg);
+    @Override public GridNioFuture<?> onSessionWrite(
+        GridNioSession ses,
+        Object msg,
+        boolean fut
+    ) throws IgniteCheckedException {
+        return proceedSessionWrite(ses, msg, fut);
     }
 
     /** {@inheritDoc} */
@@ -137,4 +144,4 @@ public class GridConnectionBytesVerifyFilter extends GridNioFilterAdapter {
     @Override public void onSessionWriteTimeout(GridNioSession ses) throws IgniteCheckedException {
         proceedSessionWriteTimeout(ses);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java
index 9925d2e..40c87cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java
@@ -107,8 +107,12 @@ public class GridNioAsyncNotifyFilter extends GridNioFilterAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException {
-        return proceedSessionWrite(ses, msg);
+    @Override public GridNioFuture<?> onSessionWrite(
+        GridNioSession ses,
+        Object msg,
+        boolean fut
+    ) throws IgniteCheckedException {
+        return proceedSessionWrite(ses, msg, fut);
     }
 
     /** {@inheritDoc} */
@@ -139,4 +143,4 @@ public class GridNioAsyncNotifyFilter extends GridNioFilterAdapter {
                 "originalEx=" + ex + ", ex=" + e + ']');
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java
index 7083ccf..343e625 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java
@@ -71,20 +71,27 @@ public class GridNioCodecFilter extends GridNioFilterAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public void onExceptionCaught(GridNioSession ses, IgniteCheckedException ex) throws IgniteCheckedException {
+    @Override public void onExceptionCaught(
+        GridNioSession ses,
+        IgniteCheckedException ex
+    ) throws IgniteCheckedException {
         proceedExceptionCaught(ses, ex);
     }
 
     /** {@inheritDoc} */
-    @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException {
+    @Override public GridNioFuture<?> onSessionWrite(
+        GridNioSession ses,
+        Object msg,
+        boolean fut
+    ) throws IgniteCheckedException {
         // No encoding needed in direct mode.
         if (directMode)
-            return proceedSessionWrite(ses, msg);
+            return proceedSessionWrite(ses, msg, fut);
 
         try {
             ByteBuffer res = parser.encode(ses, msg);
 
-            return proceedSessionWrite(ses, res);
+            return proceedSessionWrite(ses, res, fut);
         }
         catch (IOException e) {
             throw new GridNioException(e);
@@ -137,4 +144,4 @@ public class GridNioCodecFilter extends GridNioFilterAdapter {
     @Override public void onSessionWriteTimeout(GridNioSession ses) throws IgniteCheckedException {
         proceedSessionWriteTimeout(ses);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java
index 5f88b1f..f7928c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java
@@ -105,10 +105,15 @@ public interface GridNioFilter {
      *
      * @param ses Session instance.
      * @param msg Message to send.
-     * @return Write future.
+     * @param fut {@code True} if write future should be created.
+     * @return Write future or {@code null}.
      * @throws IgniteCheckedException If filter is not in chain or GridNioException occurred in the underlying filter.
      */
-    public GridNioFuture<?> proceedSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException;
+    public GridNioFuture<?> proceedSessionWrite(
+        GridNioSession ses,
+        Object msg,
+        boolean fut
+    ) throws IgniteCheckedException;
 
     /**
      * Forwards session close request to the next logical filter in filter chain.
@@ -149,10 +154,11 @@ public interface GridNioFilter {
      *
      * @param ses Session on which message should be written.
      * @param msg Message being written.
-     * @return Write future.
+     * @param fut {@code True} if write future should be created.
+     * @return Write future or {@code null}.
      * @throws GridNioException If GridNioException occurred while handling event.
      */
-    public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException;
+    public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException;
 
     /**
      * Invoked when a new messages received.
@@ -241,4 +247,4 @@ public interface GridNioFilter {
      * @throws IgniteCheckedException If filter is not in chain or GridNioException occurred in the underlying filter.
      */
     public GridNioFuture<?> onResumeReads(GridNioSession ses) throws IgniteCheckedException;
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java
index 18ab1b2..58ddae5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java
@@ -108,10 +108,14 @@ public abstract class GridNioFilterAdapter implements GridNioFilter {
     }
 
     /** {@inheritDoc} */
-    @Override public GridNioFuture<?> proceedSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException {
+    @Override public GridNioFuture<?> proceedSessionWrite(
+        GridNioSession ses,
+        Object msg,
+        boolean fut
+    ) throws IgniteCheckedException {
         checkNext();
 
-        return nextFilter.onSessionWrite(ses, msg);
+        return nextFilter.onSessionWrite(ses, msg, fut);
     }
 
     /** {@inheritDoc} */
@@ -180,4 +184,4 @@ public abstract class GridNioFilterAdapter implements GridNioFilter {
             throw new GridNioException("Failed to proceed with filter call since previous filter is not set " +
                 "(do you use filter outside the filter chain?): " + getClass().getName());
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java
index a3a74e3..8cc690b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java
@@ -181,8 +181,12 @@ public class GridNioFilterChain<T> extends GridNioFilterAdapter {
      * @return Send future.
      * @throws IgniteCheckedException If IgniteCheckedException occurred while handling event.
      */
-    @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException {
-        return tail.onSessionWrite(ses, msg);
+    @Override public GridNioFuture<?> onSessionWrite(
+        GridNioSession ses,
+        Object msg,
+        boolean fut
+    ) throws IgniteCheckedException {
+        return tail.onSessionWrite(ses, msg, fut);
     }
 
     /**
@@ -255,9 +259,9 @@ public class GridNioFilterChain<T> extends GridNioFilterAdapter {
         }
 
         /** {@inheritDoc} */
-        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg)
+        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut)
             throws IgniteCheckedException {
-            return proceedSessionWrite(ses, msg);
+            return proceedSessionWrite(ses, msg, fut);
         }
 
         /** {@inheritDoc} */
@@ -290,4 +294,4 @@ public class GridNioFilterChain<T> extends GridNioFilterAdapter {
             return proceedResumeReads(ses);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
index b02acc8..6c0c9c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
@@ -45,9 +45,9 @@ public interface GridNioFuture<R> extends IgniteInternalFuture<R> {
     /**
      * Sets ack closure which will be applied when ack received.
      *
-     * @param closure Ack closure.
+     * @param c Ack closure.
      */
-    public void ackClosure(IgniteInClosure<IgniteException> closure);
+    public void ackClosure(IgniteInClosure<IgniteException> c);
 
     /**
      * The method will be called when ack received.

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
index 35480ac..6258c13 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
@@ -35,8 +35,8 @@ public class GridNioRecoveryDescriptor {
     /** Number of acknowledged messages. */
     private long acked;
 
-    /** Unacknowledged message futures. */
-    private final ArrayDeque<GridNioFuture<?>> msgFuts;
+    /** Unacknowledged messages. */
+    private final ArrayDeque<SessionWriteRequest> msgReqs;
 
     /** Number of messages to resend. */
     private int resendCnt;
@@ -77,23 +77,40 @@ public class GridNioRecoveryDescriptor {
     /** Number of descriptor reservations (for info purposes). */
     private int reserveCnt;
 
+    /** */
+    private final boolean pairedConnections;
+
     /**
+     * @param pairedConnections {@code True} if in/out connections pair is used for communication with node.
      * @param queueLimit Maximum size of unacknowledged messages queue.
      * @param node Node.
      * @param log Logger.
      */
-    public GridNioRecoveryDescriptor(int queueLimit, ClusterNode node, IgniteLogger log) {
+    public GridNioRecoveryDescriptor(
+        boolean pairedConnections,
+        int queueLimit,
+        ClusterNode node,
+        IgniteLogger log
+    ) {
         assert !node.isLocal() : node;
         assert queueLimit > 0;
 
-        msgFuts = new ArrayDeque<>(queueLimit);
+        msgReqs = new ArrayDeque<>(queueLimit);
 
+        this.pairedConnections = pairedConnections;
         this.queueLimit = queueLimit;
         this.node = node;
         this.log = log;
     }
 
     /**
+     * @return {@code True} if in/out connections pair is used for communication with node.
+     */
+    public boolean pairedConnections() {
+        return pairedConnections;
+    }
+
+    /**
      * @return Connect count.
      */
     public long incrementConnectCount() {
@@ -154,19 +171,19 @@ public class GridNioRecoveryDescriptor {
     }
 
     /**
-     * @param fut NIO future.
+     * @param req Write request.
      * @return {@code False} if queue limit is exceeded.
      */
-    public boolean add(GridNioFuture<?> fut) {
-        assert fut != null;
+    public boolean add(SessionWriteRequest req) {
+        assert req != null;
 
-        if (!fut.skipRecovery()) {
+        if (!req.skipRecovery()) {
             if (resendCnt == 0) {
-                msgFuts.addLast(fut);
+                msgReqs.addLast(req);
 
                 sentCnt++;
 
-                return msgFuts.size() < queueLimit;
+                return msgReqs.size() < queueLimit;
             }
             else
                 resendCnt--;
@@ -181,21 +198,19 @@ public class GridNioRecoveryDescriptor {
     public void ackReceived(long rcvCnt) {
         if (log.isDebugEnabled())
             log.debug("Handle acknowledgment [acked=" + acked + ", rcvCnt=" + rcvCnt +
-                ", msgFuts=" + msgFuts.size() + ']');
+                ", msgReqs=" + msgReqs.size() + ']');
 
         while (acked < rcvCnt) {
-            GridNioFuture<?> fut = msgFuts.pollFirst();
+            SessionWriteRequest req = msgReqs.pollFirst();
 
-            assert fut != null : "Missed message future [rcvCnt=" + rcvCnt +
+            assert req != null : "Missed message [rcvCnt=" + rcvCnt +
                 ", acked=" + acked +
                 ", desc=" + this + ']';
 
-            assert fut.isDone() : fut;
-
-            if (fut.ackClosure() != null)
-                fut.ackClosure().apply(null);
+            if (req.ackClosure() != null)
+                req.ackClosure().apply(null);
 
-            fut.onAckReceived();
+            req.onAckReceived();
 
             acked++;
         }
@@ -214,7 +229,7 @@ public class GridNioRecoveryDescriptor {
      * @return {@code False} if descriptor is reserved.
      */
     public boolean onNodeLeft() {
-        GridNioFuture<?>[] futs = null;
+        SessionWriteRequest[] reqs = null;
 
         synchronized (this) {
             nodeLeft = true;
@@ -222,24 +237,24 @@ public class GridNioRecoveryDescriptor {
             if (reserved)
                 return false;
 
-            if (!msgFuts.isEmpty()) {
-                futs = msgFuts.toArray(new GridNioFuture<?>[msgFuts.size()]);
+            if (!msgReqs.isEmpty()) {
+                reqs = msgReqs.toArray(new SessionWriteRequest[msgReqs.size()]);
 
-                msgFuts.clear();
+                msgReqs.clear();
             }
         }
 
-        if (futs != null)
-            completeOnNodeLeft(futs);
+        if (reqs != null)
+            notifyOnNodeLeft(reqs);
 
         return true;
     }
 
     /**
-     * @return Message futures for unacknowledged messages.
+     * @return Requests for unacknowledged messages.
      */
-    public Deque<GridNioFuture<?>> messagesFutures() {
-        return msgFuts;
+    public Deque<SessionWriteRequest> messagesRequests() {
+        return msgReqs;
     }
 
     /**
@@ -277,14 +292,14 @@ public class GridNioRecoveryDescriptor {
             if (!nodeLeft)
                 ackReceived(rcvCnt);
 
-            resendCnt = msgFuts.size();
+            resendCnt = msgReqs.size();
         }
     }
 
     /**
      *
      */
-    public void connected() {
+    public void onConnected() {
         synchronized (this) {
             assert reserved : this;
             assert !connected : this;
@@ -306,10 +321,37 @@ public class GridNioRecoveryDescriptor {
     }
 
     /**
+     * @return Connected flag.
+     */
+    public boolean connected() {
+        synchronized (this) {
+            return connected;
+        }
+    }
+
+    /**
+     * @return Reserved flag.
+     */
+    public boolean reserved() {
+        synchronized (this) {
+            return reserved;
+        }
+    }
+
+    /**
+     * @return Current handshake index.
+     */
+    public Long handshakeIndex() {
+        synchronized (this) {
+            return handshakeReq != null ? handshakeReq.get1() : null;
+        }
+    }
+
+    /**
      *
      */
     public void release() {
-        GridNioFuture<?>[] futs = null;
+        SessionWriteRequest[] futs = null;
 
         synchronized (this) {
             connected = false;
@@ -329,15 +371,15 @@ public class GridNioRecoveryDescriptor {
                 notifyAll();
             }
 
-            if (nodeLeft && !msgFuts.isEmpty()) {
-                futs = msgFuts.toArray(new GridNioFuture<?>[msgFuts.size()]);
+            if (nodeLeft && !msgReqs.isEmpty()) {
+                futs = msgReqs.toArray(new SessionWriteRequest[msgReqs.size()]);
 
-                msgFuts.clear();
+                msgReqs.clear();
             }
         }
 
         if (futs != null)
-            completeOnNodeLeft(futs);
+            notifyOnNodeLeft(futs);
     }
 
     /**
@@ -398,16 +440,16 @@ public class GridNioRecoveryDescriptor {
     }
 
     /**
-     * @param futs Futures to complete.
+     * @param reqs Requests to notify about error.
      */
-    private void completeOnNodeLeft(GridNioFuture<?>[] futs) {
-        for (GridNioFuture<?> msg : futs) {
-            IOException e = new IOException("Failed to send message, node has left: " + node.id());
+    private void notifyOnNodeLeft(SessionWriteRequest[] reqs) {
+        IOException e = new IOException("Failed to send message, node has left: " + node.id());
 
-            ((GridNioFutureImpl)msg).onDone(e);
+        for (SessionWriteRequest req : reqs) {
+            req.onError(e);
 
-            if (msg.ackClosure() != null)
-                msg.ackClosure().apply(new IgniteException(e));
+            if (req.ackClosure() != null)
+                req.ackClosure().apply(new IgniteException(e));
         }
     }
 


Mime
View raw message