ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [27/71] [abbrv] ignite git commit: ignite-4705 Atomic cache protocol change: notify client node from backups
Date Tue, 21 Mar 2017 09:24:18 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
index d108b56..7af6139 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
@@ -24,6 +24,7 @@ import java.nio.ByteOrder;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.util.nio.GridNioFilter;
 import org.apache.ignite.internal.util.nio.GridNioFilterAdapter;
@@ -36,6 +37,7 @@ import org.apache.ignite.internal.util.nio.GridNioServerListener;
 import org.apache.ignite.internal.util.nio.GridNioSession;
 import org.apache.ignite.internal.util.nio.GridNioSessionImpl;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.plugin.extensions.communication.Message;
 
 /**
@@ -201,7 +203,10 @@ public class IpcToNioAdapter<T> {
         }
 
         /** {@inheritDoc} */
-        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) {
+        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses,
+            Object msg,
+            boolean fut,
+            IgniteInClosure<IgniteException> ackC) {
             assert ses == IpcToNioAdapter.this.ses;
 
             return send((Message)msg);

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java
index 7987d3d..f110cf2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java
@@ -19,9 +19,11 @@ package org.apache.ignite.internal.util.nio;
 
 import java.nio.ByteBuffer;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
 
 /**
  * Verifies that first bytes received in accepted (incoming)
@@ -73,9 +75,10 @@ public class GridConnectionBytesVerifyFilter extends GridNioFilterAdapter {
     @Override public GridNioFuture<?> onSessionWrite(
         GridNioSession ses,
         Object msg,
-        boolean fut
+        boolean fut,
+        IgniteInClosure<IgniteException> ackC
     ) throws IgniteCheckedException {
-        return proceedSessionWrite(ses, msg, fut);
+        return proceedSessionWrite(ses, msg, fut, ackC);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java
index 5d90cdb..d55bc54 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java
@@ -19,10 +19,12 @@ package org.apache.ignite.internal.util.nio;
 
 import java.util.concurrent.Executor;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.internal.util.worker.GridWorkerPool;
+import org.apache.ignite.lang.IgniteInClosure;
 
 /**
  * Enables multithreaded notification of session opened, message received and session closed events.
@@ -110,9 +112,10 @@ public class GridNioAsyncNotifyFilter extends GridNioFilterAdapter {
     @Override public GridNioFuture<?> onSessionWrite(
         GridNioSession ses,
         Object msg,
-        boolean fut
+        boolean fut,
+        IgniteInClosure<IgniteException> ackC
     ) throws IgniteCheckedException {
-        return proceedSessionWrite(ses, msg, fut);
+        return proceedSessionWrite(ses, msg, fut, ackC);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java
index 343e625..b81086a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java
@@ -20,10 +20,12 @@ package org.apache.ignite.internal.util.nio;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteInClosure;
 
 /**
  * Filter that transforms byte buffers to user-defined objects and vice-versa
@@ -82,16 +84,17 @@ public class GridNioCodecFilter extends GridNioFilterAdapter {
     @Override public GridNioFuture<?> onSessionWrite(
         GridNioSession ses,
         Object msg,
-        boolean fut
+        boolean fut,
+        IgniteInClosure<IgniteException> ackC
     ) throws IgniteCheckedException {
         // No encoding needed in direct mode.
         if (directMode)
-            return proceedSessionWrite(ses, msg, fut);
+            return proceedSessionWrite(ses, msg, fut, ackC);
 
         try {
             ByteBuffer res = parser.encode(ses, msg);
 
-            return proceedSessionWrite(ses, res, fut);
+            return proceedSessionWrite(ses, res, fut, ackC);
         }
         catch (IOException e) {
             throw new GridNioException(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioEmbeddedFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioEmbeddedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioEmbeddedFuture.java
index be77d39..eab4909 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioEmbeddedFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioEmbeddedFuture.java
@@ -31,6 +31,13 @@ public class GridNioEmbeddedFuture<R> extends GridNioFutureImpl<R> {
     private static final long serialVersionUID = 0L;
 
     /**
+     *
+     */
+    public GridNioEmbeddedFuture() {
+        super(null);
+    }
+
+    /**
      * Callback to notify that future is finished.
      * This method must delegate to {@link #onDone(GridNioFuture, Throwable)} method.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java
index f7928c4..9163a4f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.internal.util.nio;
 
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.lang.IgniteInClosure;
 
 /**
  * This interface defines the general element in transformation chain between the nio server and
@@ -106,13 +108,15 @@ public interface GridNioFilter {
      * @param ses Session instance.
      * @param msg Message to send.
      * @param fut {@code True} if write future should be created.
+     * @param ackC Closure invoked when message ACK is received.
      * @return Write future or {@code null}.
      * @throws IgniteCheckedException If filter is not in chain or GridNioException occurred in the underlying filter.
      */
     public GridNioFuture<?> proceedSessionWrite(
         GridNioSession ses,
         Object msg,
-        boolean fut
+        boolean fut,
+        IgniteInClosure<IgniteException> ackC
     ) throws IgniteCheckedException;
 
     /**
@@ -155,10 +159,14 @@ public interface GridNioFilter {
      * @param ses Session on which message should be written.
      * @param msg Message being written.
      * @param fut {@code True} if write future should be created.
+     * @param ackC Closure invoked when message ACK is received.
      * @return Write future or {@code null}.
      * @throws GridNioException If GridNioException occurred while handling event.
      */
-    public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException;
+    public GridNioFuture<?> onSessionWrite(GridNioSession ses,
+        Object msg,
+        boolean fut,
+        IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException;
 
     /**
      * Invoked when a new messages received.

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java
index 58ddae5..4ede4b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.internal.util.nio;
 
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.lang.IgniteInClosure;
 
 /**
  * Class that defines the piece for application-to-network and vice-versa data conversions
@@ -111,11 +113,12 @@ public abstract class GridNioFilterAdapter implements GridNioFilter {
     @Override public GridNioFuture<?> proceedSessionWrite(
         GridNioSession ses,
         Object msg,
-        boolean fut
+        boolean fut,
+        IgniteInClosure<IgniteException> ackC
     ) throws IgniteCheckedException {
         checkNext();
 
-        return nextFilter.onSessionWrite(ses, msg, fut);
+        return nextFilter.onSessionWrite(ses, msg, fut, ackC);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java
index 8cc690b..ec59020 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java
@@ -18,8 +18,10 @@
 package org.apache.ignite.internal.util.nio;
 
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.util.typedef.internal.LT;
+import org.apache.ignite.lang.IgniteInClosure;
 
 /**
  * Filter chain implementation for nio server filters.
@@ -184,9 +186,10 @@ public class GridNioFilterChain<T> extends GridNioFilterAdapter {
     @Override public GridNioFuture<?> onSessionWrite(
         GridNioSession ses,
         Object msg,
-        boolean fut
+        boolean fut,
+        IgniteInClosure<IgniteException> ackC
     ) throws IgniteCheckedException {
-        return tail.onSessionWrite(ses, msg, fut);
+        return tail.onSessionWrite(ses, msg, fut, ackC);
     }
 
     /**
@@ -259,9 +262,11 @@ public class GridNioFilterChain<T> extends GridNioFilterAdapter {
         }
 
         /** {@inheritDoc} */
-        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut)
-            throws IgniteCheckedException {
-            return proceedSessionWrite(ses, msg, fut);
+        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses,
+            Object msg,
+            boolean fut,
+            IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
+            return proceedSessionWrite(ses, msg, fut, ackC);
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java
index 3d18ab7..2835a22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java
@@ -59,11 +59,6 @@ public class GridNioFinishedFuture<R> extends GridFinishedFuture<R> implements G
     }
 
     /** {@inheritDoc} */
-    @Override public void ackClosure(IgniteInClosure<IgniteException> closure) {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
     @Override public void onAckReceived() {
         // No-op.
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
index 6c0c9c6..4d1fee6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
@@ -43,13 +43,6 @@ public interface GridNioFuture<R> extends IgniteInternalFuture<R> {
     public boolean skipRecovery();
 
     /**
-     * Sets ack closure which will be applied when ack received.
-     *
-     * @param c Ack closure.
-     */
-    public void ackClosure(IgniteInClosure<IgniteException> c);
-
-    /**
      * The method will be called when ack received.
      */
     public void onAckReceived();

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java
index fe97039..6a94a54 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java
@@ -30,10 +30,17 @@ public class GridNioFutureImpl<R> extends GridFutureAdapter<R> implements GridNi
     private static final long serialVersionUID = 0L;
 
     /** */
-    protected boolean msgThread;
+    private boolean msgThread;
 
     /** */
-    protected IgniteInClosure<IgniteException> ackClosure;
+    protected final IgniteInClosure<IgniteException> ackC;
+
+    /**
+     * @param ackC Ack closure.
+     */
+    public GridNioFutureImpl(IgniteInClosure<IgniteException> ackC) {
+        this.ackC = ackC;
+    }
 
     /** {@inheritDoc} */
     @Override public void messageThread(boolean msgThread) {
@@ -51,18 +58,13 @@ public class GridNioFutureImpl<R> extends GridFutureAdapter<R> implements GridNi
     }
 
     /** {@inheritDoc} */
-    @Override public void ackClosure(IgniteInClosure<IgniteException> closure) {
-        ackClosure = closure;
-    }
-
-    /** {@inheritDoc} */
     @Override public void onAckReceived() {
         // No-op.
     }
 
     /** {@inheritDoc} */
     @Override public IgniteInClosure<IgniteException> ackClosure() {
-        return ackClosure;
+        return ackC;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index fefdf15..7f25e40 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -74,7 +74,6 @@ import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 import sun.nio.ch.DirectBuffer;
 
-import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.ACK_CLOSURE;
 import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.MSG_WRITER;
 import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.NIO_OPERATION;
 
@@ -481,22 +480,26 @@ public class GridNioServer<T> {
      * @param ses Session.
      * @param msg Message.
      * @param createFut {@code True} if future should be created.
+     * @param ackC Closure invoked when message ACK is received.
      * @return Future for operation.
      */
-    GridNioFuture<?> send(GridNioSession ses, ByteBuffer msg, boolean createFut) throws IgniteCheckedException {
+    GridNioFuture<?> send(GridNioSession ses,
+        ByteBuffer msg,
+        boolean createFut,
+        IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
         assert ses instanceof GridSelectorNioSessionImpl : ses;
 
         GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
 
         if (createFut) {
-            NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg);
+            NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg, ackC);
 
             send0(impl, fut, false);
 
             return fut;
         }
         else {
-            SessionWriteRequest req = new WriteRequestImpl(ses, msg, true);
+            SessionWriteRequest req = new WriteRequestImpl(ses, msg, true, ackC);
 
             send0(impl, req, false);
 
@@ -508,23 +511,27 @@ public class GridNioServer<T> {
      * @param ses Session.
      * @param msg Message.
      * @param createFut {@code True} if future should be created.
+     * @param ackC Closure invoked when message ACK is received.
      * @return Future for operation.
      */
-    GridNioFuture<?> send(GridNioSession ses, Message msg, boolean createFut) throws IgniteCheckedException {
+    GridNioFuture<?> send(GridNioSession ses,
+        Message msg,
+        boolean createFut,
+        IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
         assert ses instanceof GridSelectorNioSessionImpl;
 
         GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
 
         if (createFut) {
             NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg,
-                skipRecoveryPred.apply(msg));
+                skipRecoveryPred.apply(msg), ackC);
 
             send0(impl, fut, false);
 
             return fut;
         }
         else {
-            SessionWriteRequest req = new WriteRequestImpl(ses, msg, skipRecoveryPred.apply(msg));
+            SessionWriteRequest req = new WriteRequestImpl(ses, msg, skipRecoveryPred.apply(msg), ackC);
 
             send0(impl, req, false);
 
@@ -544,11 +551,6 @@ public class GridNioServer<T> {
 
         int msgCnt = sys ? ses.offerSystemFuture(req) : ses.offerFuture(req);
 
-        IgniteInClosure<IgniteException> ackC;
-
-        if (!sys && (ackC = ses.removeMeta(ACK_CLOSURE.ordinal())) != null)
-            req.ackClosure(ackC);
-
         if (ses.closed()) {
             if (ses.removeFuture(req)) {
                 IOException err = new IOException("Failed to send message (connection was closed): " + ses);
@@ -597,8 +599,11 @@ public class GridNioServer<T> {
         GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
 
         if (lsnr != null) {
-            NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg,
-                skipRecoveryPred.apply(msg));
+            NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl,
+                NioOperation.REQUIRE_WRITE,
+                msg,
+                skipRecoveryPred.apply(msg),
+                null);
 
             fut.listen(lsnr);
 
@@ -2597,11 +2602,6 @@ public class GridNioServer<T> {
         }
 
         /** {@inheritDoc} */
-        @Override public void ackClosure(IgniteInClosure<IgniteException> c) {
-            throw new UnsupportedOperationException();
-        }
-
-        /** {@inheritDoc} */
         @Override public void onAckReceived() {
             throw new UnsupportedOperationException();
         }
@@ -2664,17 +2664,22 @@ public class GridNioServer<T> {
         private final boolean skipRecovery;
 
         /** */
-        private IgniteInClosure<IgniteException> ackC;
+        private final IgniteInClosure<IgniteException> ackC;
 
         /**
          * @param ses Session.
          * @param msg Message.
          * @param skipRecovery Skip recovery flag.
+         * @param ackC Closure invoked when message ACK is received.
          */
-        WriteRequestImpl(GridNioSession ses, Object msg, boolean skipRecovery) {
+        WriteRequestImpl(GridNioSession ses,
+            Object msg,
+            boolean skipRecovery,
+            IgniteInClosure<IgniteException> ackC) {
             this.ses = ses;
             this.msg = msg;
             this.skipRecovery = skipRecovery;
+            this.ackC = ackC;
         }
 
         /** {@inheritDoc} */
@@ -2693,11 +2698,6 @@ public class GridNioServer<T> {
         }
 
         /** {@inheritDoc} */
-        @Override public void ackClosure(IgniteInClosure<IgniteException> c) {
-            ackC = c;
-        }
-
-        /** {@inheritDoc} */
         @Override public void onAckReceived() {
             assert msg instanceof Message;
 
@@ -2798,6 +2798,8 @@ public class GridNioServer<T> {
             boolean accepted,
             @Nullable Map<Integer, ?> meta
         ) {
+            super(null);
+
             op = NioOperation.REGISTER;
 
             this.sockCh = sockCh;
@@ -2812,6 +2814,8 @@ public class GridNioServer<T> {
          * @param op Requested operation.
          */
         NioOperationFuture(GridSelectorNioSessionImpl ses, NioOperation op) {
+            super(null);
+
             assert ses != null || op == NioOperation.DUMP_STATS : "Invalid params [ses=" + ses + ", op=" + op + ']';
             assert op != null;
             assert op != NioOperation.REGISTER;
@@ -2826,8 +2830,14 @@ public class GridNioServer<T> {
          * @param ses Session to change.
          * @param op Requested operation.
          * @param msg Message.
+         * @param ackC Closure invoked when message ACK is received.
          */
-        NioOperationFuture(GridSelectorNioSessionImpl ses, NioOperation op, Object msg) {
+        NioOperationFuture(GridSelectorNioSessionImpl ses,
+            NioOperation op,
+            Object msg,
+            IgniteInClosure<IgniteException> ackC) {
+            super(ackC);
+
             assert ses != null;
             assert op != null;
             assert op != NioOperation.REGISTER;
@@ -2845,9 +2855,15 @@ public class GridNioServer<T> {
          * @param op Requested operation.
          * @param commMsg Direct message.
          * @param skipRecovery Skip recovery flag.
+         * @param ackC Closure invoked when message ACK is received.
          */
-        NioOperationFuture(GridSelectorNioSessionImpl ses, NioOperation op,
-            Message commMsg, boolean skipRecovery) {
+        NioOperationFuture(GridSelectorNioSessionImpl ses,
+            NioOperation op,
+            Message commMsg,
+            boolean skipRecovery,
+            IgniteInClosure<IgniteException> ackC) {
+            super(ackC);
+
             assert ses != null;
             assert op != null;
             assert op != NioOperation.REGISTER;
@@ -3013,7 +3029,10 @@ public class GridNioServer<T> {
         }
 
         /** {@inheritDoc} */
-        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException {
+        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses,
+            Object msg,
+            boolean fut,
+            IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
             if (directMode) {
                 boolean sslSys = sslFilter != null && msg instanceof ByteBuffer;
 
@@ -3032,10 +3051,10 @@ public class GridNioServer<T> {
                     return null;
                 }
                 else
-                    return send(ses, (Message)msg, fut);
+                    return send(ses, (Message)msg, fut, ackC);
             }
             else
-                return send(ses, (ByteBuffer)msg, fut);
+                return send(ses, (ByteBuffer)msg, fut, ackC);
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
index c1b60ab..21eabf3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.util.nio;
 
 import java.net.InetSocketAddress;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -107,8 +109,11 @@ public interface GridNioSession {
 
     /**
      * @param msg Message to be sent.
+     * @param ackC Optional closure invoked when ack for message is received.
+     * @throws IgniteCheckedException If failed.
      */
-    public void sendNoFuture(Object msg) throws IgniteCheckedException;
+    public void sendNoFuture(Object msg, @Nullable IgniteInClosure<IgniteException> ackC)
+        throws IgniteCheckedException;
 
     /**
      * Gets metadata associated with specified key.

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
index 7424531..98a22d6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
@@ -20,8 +20,10 @@ package org.apache.ignite.internal.util.nio;
 import java.net.InetSocketAddress;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.MAX_KEYS_CNT;
@@ -105,7 +107,7 @@ public class GridNioSessionImpl implements GridNioSession {
         try {
             resetSendScheduleTime();
 
-            return chain().onSessionWrite(this, msg, true);
+            return chain().onSessionWrite(this, msg, true, null);
         }
         catch (IgniteCheckedException e) {
             close();
@@ -115,9 +117,10 @@ public class GridNioSessionImpl implements GridNioSession {
     }
 
     /** {@inheritDoc} */
-    @Override public void sendNoFuture(Object msg) throws IgniteCheckedException {
+    @Override public void sendNoFuture(Object msg, IgniteInClosure<IgniteException> ackC)
+        throws IgniteCheckedException {
         try {
-            chain().onSessionWrite(this, msg, false);
+            chain().onSessionWrite(this, msg, false, ackC);
         }
         catch (IgniteCheckedException e) {
             close();

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
index bdb3a29..5385430 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
@@ -42,10 +42,7 @@ public enum GridNioSessionMetaKey {
     MARSHALLER_ID,
 
     /** Message writer. */
-    MSG_WRITER,
-
-    /** Ack closure. */
-    ACK_CLOSURE;
+    MSG_WRITER;
 
     /** Maximum count of NIO session keys in system. */
     public static final int MAX_KEYS_CNT = 64;

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
index d941bae..ab9b2eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
@@ -123,7 +123,7 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien
 
     /** {@inheritDoc} */
     @Override public synchronized boolean sendMessage(UUID nodeId, Message msg,
-        IgniteInClosure<IgniteException> closure) throws IgniteCheckedException {
+        IgniteInClosure<IgniteException> c) throws IgniteCheckedException {
         assert nodeId != null;
 
         if (closed())
@@ -142,8 +142,8 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien
 
         markUsed();
 
-        if (closure != null)
-            closure.apply(null);
+        if (c != null)
+            c.apply(null);
 
         return false;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
index 3397772..eff893f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
@@ -32,8 +32,6 @@ import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.jetbrains.annotations.Nullable;
 
-import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.ACK_CLOSURE;
-
 /**
  * Grid client for NIO server.
  */
@@ -115,18 +113,9 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie
             // Node ID is never provided in asynchronous send mode.
             assert nodeId == null;
 
-            if (c != null)
-                ses.addMeta(ACK_CLOSURE.ordinal(), c);
-
-            ses.sendNoFuture(msg);
-
-            if (c != null)
-                ses.removeMeta(ACK_CLOSURE.ordinal());
+            ses.sendNoFuture(msg, c);
         }
         catch (IgniteCheckedException e) {
-            if (c != null)
-                ses.removeMeta(ACK_CLOSURE.ordinal());
-
             if (log.isDebugEnabled())
                 log.debug("Failed to send message [client=" + this + ", err=" + e + ']');
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java
index 508c791..e24f3ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java
@@ -42,13 +42,6 @@ public interface SessionWriteRequest {
     public boolean skipRecovery();
 
     /**
-     * Sets ack closure which will be applied when ack received.
-     *
-     * @param c Ack closure.
-     */
-    public void ackClosure(IgniteInClosure<IgniteException> c);
-
-    /**
      * The method will be called when ack received.
      */
     public void onAckReceived();

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
index 8ed7db0..b4bd34a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.util.nio.GridNioFutureImpl;
 import org.apache.ignite.internal.util.nio.GridNioSession;
 import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
 
 import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META;
 
@@ -285,10 +286,11 @@ public class GridNioSslFilter extends GridNioFilterAdapter {
     @Override public GridNioFuture<?> onSessionWrite(
         GridNioSession ses,
         Object msg,
-        boolean fut
+        boolean fut,
+        IgniteInClosure<IgniteException> ackC
     ) throws IgniteCheckedException {
         if (directMode)
-            return proceedSessionWrite(ses, msg, fut);
+            return proceedSessionWrite(ses, msg, fut, ackC);
 
         ByteBuffer input = checkMessage(ses, msg);
 
@@ -307,13 +309,13 @@ public class GridNioSslFilter extends GridNioFilterAdapter {
             if (hnd.isHandshakeFinished()) {
                 hnd.encrypt(input);
 
-                return hnd.writeNetBuffer();
+                return hnd.writeNetBuffer(ackC);
             }
             else {
                 if (log.isDebugEnabled())
                     log.debug("Write request received during handshake, scheduling deferred write: " + ses);
 
-                return hnd.deferredWrite(input);
+                return hnd.deferredWrite(input, ackC);
             }
         }
         catch (SSLException e) {
@@ -390,7 +392,7 @@ public class GridNioSslFilter extends GridNioFilterAdapter {
         try {
             hnd.closeOutbound();
 
-            hnd.writeNetBuffer();
+            hnd.writeNetBuffer(null);
         }
         catch (SSLException e) {
             U.warn(log, "Failed to shutdown SSL session gracefully (will force close) [ex=" + e + ", ses=" + ses + ']');

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java
index 269e8b9..e268716 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java
@@ -27,6 +27,7 @@ import javax.net.ssl.SSLEngineResult;
 import javax.net.ssl.SSLException;
 import javax.net.ssl.SSLSession;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.util.nio.GridNioEmbeddedFuture;
 import org.apache.ignite.internal.util.nio.GridNioException;
@@ -34,6 +35,7 @@ import org.apache.ignite.internal.util.nio.GridNioFuture;
 import org.apache.ignite.internal.util.nio.GridNioFutureImpl;
 import org.apache.ignite.internal.util.nio.GridNioSession;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
 
 import static javax.net.ssl.SSLEngineResult.HandshakeStatus;
 import static javax.net.ssl.SSLEngineResult.HandshakeStatus.FINISHED;
@@ -274,7 +276,7 @@ class GridNioSslHandler extends ReentrantLock {
                             log.debug("Wrapped handshake data [status=" + res.getStatus() + ", handshakeStatus=" +
                                 handshakeStatus + ", ses=" + ses + ']');
 
-                        writeNetBuffer();
+                        writeNetBuffer(null);
 
                         break;
                     }
@@ -412,16 +414,17 @@ class GridNioSslHandler extends ReentrantLock {
      * Adds write request to the queue.
      *
      * @param buf Buffer to write.
+     * @param ackC Closure invoked when message ACK is received.
      * @return Write future.
      */
-    GridNioFuture<?> deferredWrite(ByteBuffer buf) {
+    GridNioFuture<?> deferredWrite(ByteBuffer buf, IgniteInClosure<IgniteException> ackC) {
         assert isHeldByCurrentThread();
 
         GridNioEmbeddedFuture<Object> fut = new GridNioEmbeddedFuture<>();
 
         ByteBuffer cp = copy(buf);
 
-        deferredWriteQueue.offer(new WriteRequest(fut, cp));
+        deferredWriteQueue.offer(new WriteRequest(fut, cp, ackC));
 
         return fut;
     }
@@ -437,7 +440,7 @@ class GridNioSslHandler extends ReentrantLock {
         while (!deferredWriteQueue.isEmpty()) {
             WriteRequest req = deferredWriteQueue.poll();
 
-            req.future().onDone((GridNioFuture<Object>)parent.proceedSessionWrite(ses, req.buffer(), true));
+            req.future().onDone((GridNioFuture<Object>)parent.proceedSessionWrite(ses, req.buffer(), true, req.ackC));
         }
     }
 
@@ -475,14 +478,15 @@ class GridNioSslHandler extends ReentrantLock {
      * Copies data from out net buffer and passes it to the underlying chain.
      *
      * @return Write future.
+     * @param ackC Closure invoked when message ACK is received.
      * @throws GridNioException If send failed.
      */
-    GridNioFuture<?> writeNetBuffer() throws IgniteCheckedException {
+    GridNioFuture<?> writeNetBuffer(IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
         assert isHeldByCurrentThread();
 
         ByteBuffer cp = copy(outNetBuf);
 
-        return parent.proceedSessionWrite(ses, cp, true);
+        return parent.proceedSessionWrite(ses, cp, true, ackC);
     }
 
     /**
@@ -670,20 +674,27 @@ class GridNioSslHandler extends ReentrantLock {
      */
     private static class WriteRequest {
         /** Future that should be completed. */
-        private GridNioEmbeddedFuture<Object> fut;
+        private final GridNioEmbeddedFuture<Object> fut;
 
         /** Buffer needed to be written. */
-        private ByteBuffer buf;
+        private final ByteBuffer buf;
+
+        /** */
+        private final IgniteInClosure<IgniteException> ackC;
 
         /**
          * Creates write request.
          *
          * @param fut Future.
          * @param buf Buffer to write.
+         * @param ackC Closure invoked when message ACK is received.
          */
-        private WriteRequest(GridNioEmbeddedFuture<Object> fut, ByteBuffer buf) {
+        private WriteRequest(GridNioEmbeddedFuture<Object> fut,
+            ByteBuffer buf,
+            IgniteInClosure<IgniteException> ackC) {
             this.fut = fut;
             this.buf = buf;
+            this.ackC = ackC;
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 35568c3..fe915e5 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -369,7 +369,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         log.debug("Sending local node ID to newly accepted session: " + ses);
 
                     try {
-                        ses.sendNoFuture(nodeIdMessage());
+                        ses.sendNoFuture(nodeIdMessage(), null);
                     }
                     catch (IgniteCheckedException e) {
                         U.error(log, "Failed to send message: " + e, e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
index 8d88876..6005ac9 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
@@ -46,19 +46,19 @@ public class IgniteThread extends Thread {
     /** The name of the Ignite instance this thread belongs to. */
     protected final String igniteInstanceName;
 
-    /** Group index. */
-    private final int grpIdx;
-
     /** */
     private int compositeRwLockIdx;
 
+    /** */
+    private final int stripe;
+
     /**
      * Creates thread with given worker.
      *
      * @param worker Runnable to create thread with.
      */
     public IgniteThread(GridWorker worker) {
-        this(DFLT_GRP, worker.igniteInstanceName(), worker.name(), worker, GRP_IDX_UNASSIGNED);
+        this(DFLT_GRP, worker.igniteInstanceName(), worker.name(), worker, GRP_IDX_UNASSIGNED, -1);
     }
 
     /**
@@ -69,7 +69,7 @@ public class IgniteThread extends Thread {
      * @param r Runnable to execute.
      */
     public IgniteThread(String igniteInstanceName, String threadName, Runnable r) {
-        this(igniteInstanceName, threadName, r, GRP_IDX_UNASSIGNED);
+        this(igniteInstanceName, threadName, r, GRP_IDX_UNASSIGNED, -1);
     }
 
     /**
@@ -79,9 +79,10 @@ public class IgniteThread extends Thread {
      * @param threadName Name of thread.
      * @param r Runnable to execute.
      * @param grpIdx Index within a group.
+     * @param stripe Non-negative stripe number if this thread is striped pool thread.
      */
-    public IgniteThread(String igniteInstanceName, String threadName, Runnable r, int grpIdx) {
-        this(DFLT_GRP, igniteInstanceName, threadName, r, grpIdx);
+    public IgniteThread(String igniteInstanceName, String threadName, Runnable r, int grpIdx, int stripe) {
+        this(DFLT_GRP, igniteInstanceName, threadName, r, grpIdx, stripe);
     }
 
     /**
@@ -93,14 +94,16 @@ public class IgniteThread extends Thread {
      * @param threadName Name of thread.
      * @param r Runnable to execute.
      * @param grpIdx Thread index within a group.
+     * @param stripe Non-negative stripe number if this thread is striped pool thread.
      */
-    public IgniteThread(ThreadGroup grp, String igniteInstanceName, String threadName, Runnable r, int grpIdx) {
+    public IgniteThread(ThreadGroup grp, String igniteInstanceName, String threadName, Runnable r, int grpIdx, int stripe) {
         super(grp, r, createName(cntr.incrementAndGet(), threadName, igniteInstanceName));
 
         A.ensure(grpIdx >= -1, "grpIdx >= -1");
 
         this.igniteInstanceName = igniteInstanceName;
-        this.grpIdx = compositeRwLockIdx = grpIdx;
+        this.compositeRwLockIdx = grpIdx;
+        this.stripe = stripe;
     }
 
     /**
@@ -112,18 +115,15 @@ public class IgniteThread extends Thread {
         super(threadGrp, threadName);
 
         this.igniteInstanceName = igniteInstanceName;
-        this.grpIdx = compositeRwLockIdx = GRP_IDX_UNASSIGNED;
+        this.compositeRwLockIdx = GRP_IDX_UNASSIGNED;
+        this.stripe = -1;
     }
 
     /**
-     * Gets name of the grid this thread belongs to.
-     *
-     * @return Name of the grid this thread belongs to.
-     * @deprecated use {@link #getIgniteInstanceName()}
+     * @return Non-negative stripe number if this thread is striped pool thread.
      */
-    @Deprecated
-    public String getGridName() {
-        return getIgniteInstanceName();
+    public int stripe() {
+        return stripe;
     }
 
     /**
@@ -136,13 +136,6 @@ public class IgniteThread extends Thread {
     }
 
     /**
-     * @return Group index.
-     */
-    public int groupIndex() {
-        return grpIdx;
-    }
-
-    /**
      * @return Composite RW lock index.
      */
     public int compositeRwLockIndex() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
index 119ef70..d2f0b15 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
@@ -61,7 +61,7 @@ public class IgniteThreadFactory implements ThreadFactory {
 
     /** {@inheritDoc} */
     @Override public Thread newThread(@NotNull Runnable r) {
-        return new IgniteThread(igniteInstanceName, threadName, r, idxGen.incrementAndGet());
+        return new IgniteThread(igniteInstanceName, threadName, r, idxGen.incrementAndGet(), -1);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
index da2923f..8cbb596 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
@@ -47,7 +47,7 @@ public class GridCommunicationSendMessageSelfTest extends GridCommonAbstractTest
     private static final int SAMPLE_CNT = 1;
 
     /** */
-    private static final byte DIRECT_TYPE = (byte)210;
+    private static final byte DIRECT_TYPE = -127;
 
     /** */
     private int bufSize;

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRebalancingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRebalancingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRebalancingSelfTest.java
index 7c0e485..a158f7a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRebalancingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRebalancingSelfTest.java
@@ -40,6 +40,13 @@ public class CacheRebalancingSelfTest extends GridCommonAbstractTest {
         return cfg;
     }
 
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
     /**
      * @throws Exception If failed.
      */
@@ -63,13 +70,12 @@ public class CacheRebalancingSelfTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @param future Future.
+     * @param fut Future.
      * @return Internal future.
      */
-    private static IgniteInternalFuture internalFuture(IgniteFuture future) {
-        assert future instanceof IgniteFutureImpl;
+    private static IgniteInternalFuture internalFuture(IgniteFuture fut) {
+        assert fut instanceof IgniteFutureImpl : fut;
 
-        return ((IgniteFutureImpl)future).internalFuture();
+        return ((IgniteFutureImpl) fut).internalFuture();
     }
-
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
index 52c0ac5..09a0d9a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
@@ -78,8 +78,6 @@ public abstract class GridCacheAbstractFailoverSelfTest extends GridCacheAbstrac
 
         cfg.setNetworkTimeout(60_000);
 
-        cfg.getTransactionConfiguration().setTxSerializableEnabled(true);
-
         TcpDiscoverySpi discoSpi = (TcpDiscoverySpi)cfg.getDiscoverySpi();
 
         discoSpi.setSocketTimeout(30_000);

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
index 76cf78c..a12b6b9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
@@ -94,13 +94,6 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest
     /**
      * @throws Exception If failed.
      */
-    public void testPartitionedClock() throws Exception {
-        checkMessages(false, CLOCK);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
     public void testPartitionedPrimary() throws Exception {
         checkMessages(false, PRIMARY);
     }
@@ -108,13 +101,6 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest
     /**
      * @throws Exception If failed.
      */
-    public void testClientClock() throws Exception {
-        checkMessages(true, CLOCK);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
     public void testClientPrimary() throws Exception {
         checkMessages(true, PRIMARY);
     }
@@ -206,14 +192,14 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest
         private Map<Class<?>, AtomicInteger> cntMap = new HashMap<>();
 
         /** {@inheritDoc} */
-        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
+        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
             throws IgniteSpiException {
             AtomicInteger cntr = cntMap.get(((GridIoMessage)msg).message().getClass());
 
             if (cntr != null)
                 cntr.incrementAndGet();
 
-            super.sendMessage(node, msg, ackClosure);
+            super.sendMessage(node, msg, ackC);
         }
 
         /**
@@ -221,7 +207,7 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest
          *
          * @param cls Class to count.
          */
-        public void registerMessage(Class<?> cls) {
+        void registerMessage(Class<?> cls) {
             AtomicInteger cntr = cntMap.get(cls);
 
             if (cntr == null)
@@ -232,7 +218,7 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest
          * @param cls Message type to get count.
          * @return Number of messages of given class.
          */
-        public int messageCount(Class<?> cls) {
+        int messageCount(Class<?> cls) {
             AtomicInteger cntr = cntMap.get(cls);
 
             return cntr == null ? 0 : cntr.get();

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
index ba37974..dd27d72 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
@@ -66,6 +66,7 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.jetbrains.annotations.Nullable;

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
index 728bf13..a44e49e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
@@ -401,9 +401,6 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
 
         assertEquals(3, msgs.size());
 
-        for (Object msg : msgs)
-            assertTrue(((GridNearAtomicFullUpdateRequest)msg).clientRequest());
-
         map.put(primaryKey(ignite0.cache(null)), 3);
         map.put(primaryKey(ignite1.cache(null)), 4);
         map.put(primaryKey(ignite2.cache(null)), 5);
@@ -1693,8 +1690,6 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
      * @throws Exception If failed.
      */
     public void testAtomicPrimaryPutAllMultinode() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-1685");
-
         multinode(PRIMARY, TestType.PUT_ALL);
     }
 
@@ -1702,8 +1697,6 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
      * @throws Exception If failed.
      */
     public void testAtomicClockPutAllMultinode() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-1685");
-
         multinode(CLOCK, TestType.PUT_ALL);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
index cb1f6fb4..1d2cd2d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
@@ -103,7 +103,7 @@ public abstract class IgniteCacheMessageRecoveryAbstractTest extends GridCommonA
         for (int i = 0; i < GRID_CNT; i++) {
             final IgniteKernal grid = (IgniteKernal)grid(i);
 
-            GridTestUtils.retryAssert(log, 10, 100, new CA() {
+            GridTestUtils.retryAssert(log, 10, 500, new CA() {
                 @Override public void apply() {
                     assertTrue(grid.internalCache().context().mvcc().atomicFutures().isEmpty());
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicNearCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicNearCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicNearCacheSelfTest.java
index 5050300..2600e7b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicNearCacheSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicNearCacheSelfTest.java
@@ -39,9 +39,11 @@ import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.jetbrains.annotations.Nullable;
 
@@ -702,8 +704,12 @@ public class GridCacheAtomicNearCacheSelfTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     @SuppressWarnings("ConstantConditions")
-    private void checkEntry(Ignite ignite, Integer key, @Nullable Integer val, boolean expectNear, UUID... expReaders)
-        throws Exception {
+    private void checkEntry(Ignite ignite,
+        Integer key,
+        @Nullable Integer val,
+        boolean expectNear,
+        final UUID... expReaders) throws Exception
+    {
         GridCacheAdapter<Integer, Integer> near = ((IgniteKernal) ignite).internalCache();
 
         assertTrue(near.isNear());
@@ -728,11 +734,22 @@ public class GridCacheAtomicNearCacheSelfTest extends GridCommonAbstractTest {
 
         GridDhtCacheAdapter<Integer, Integer> dht = ((GridNearCacheAdapter<Integer, Integer>)near).dht();
 
-        GridDhtCacheEntry dhtEntry = (GridDhtCacheEntry)dht.peekEx(key);
+        final GridDhtCacheEntry dhtEntry = (GridDhtCacheEntry)dht.peekEx(key);
 
         if (expectDht) {
             assertNotNull("No dht entry for: " + key + ", grid: " + ignite.name(), dhtEntry);
 
+            GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    try {
+                        return dhtEntry.readers().size() == expReaders.length;
+                    }
+                    catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }, 5000);
+
             Collection<UUID> readers = dhtEntry.readers();
 
             assertEquals(expReaders.length, readers.size());

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
index 3942e35..2971f81 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
@@ -43,10 +43,10 @@ import org.apache.ignite.configuration.AtomicConfiguration;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
@@ -152,7 +152,12 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
     @Override protected void afterTest() throws Exception {
         super.afterTest();
 
-        ignite(0).destroyCache(null);
+        try {
+            checkInternalCleanup();
+        }
+        finally {
+            ignite(0).destroyCache(null);
+        }
     }
 
     /**
@@ -456,22 +461,40 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
 
         for (int i = 0; i < keysCnt; i++)
             assertEquals((Integer)iter, cache.get(i));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkInternalCleanup() throws Exception{
+        checkNoAtomicFutures();
+
+        checkOnePhaseCommitReturnValuesCleaned();
+    }
 
+    /**
+     * @throws Exception If failed.
+     */
+    void checkNoAtomicFutures() throws Exception {
         for (int i = 0; i < GRID_CNT; i++) {
-            IgniteKernal ignite = (IgniteKernal)grid(i);
+            final IgniteKernal ignite = (IgniteKernal)grid(i);
+
+            GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    return ignite.context().cache().context().mvcc().atomicFuturesCount() == 0;
+                }
+            }, 5_000);
 
             Collection<?> futs = ignite.context().cache().context().mvcc().atomicFutures();
 
             assertTrue("Unexpected atomic futures: " + futs, futs.isEmpty());
         }
-
-        checkOnePhaseCommitReturnValuesCleaned();
     }
 
     /**
-     *
+     * @throws Exception If failed.
      */
-    protected void checkOnePhaseCommitReturnValuesCleaned() throws IgniteInterruptedCheckedException {
+    void checkOnePhaseCommitReturnValuesCleaned() throws Exception {
         U.sleep(DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT);
 
         for (int i = 0; i < GRID_CNT; i++) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
index cc5f548..7460828 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
@@ -208,7 +208,7 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr
     }
 
     /**
-     *
+     * @throws Exception If failed.
      */
     public void testOriginatingNodeFailureForcesOnePhaseCommitDataCleanup() throws Exception {
         ignite(0).createCache(cacheConfiguration(TestMemoryMode.HEAP, false));


Mime
View raw message