ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [01/24] incubator-ignite git commit: #ignite-138: Implemented.
Date Thu, 12 Feb 2015 12:57:19 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-50 d83c7fcc2 -> 8292f3297


#ignite-138: Implemented.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/eb74c5ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/eb74c5ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/eb74c5ed

Branch: refs/heads/ignite-50
Commit: eb74c5ed625eb0dff30b0c4f31a9d3c6084bd937
Parents: ff7ee6d
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Mon Feb 2 17:17:08 2015 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Mon Feb 2 17:17:08 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheContext.java      | 15 +++-
 .../cache/GridCacheEvictionManager.java         |  4 +-
 .../processors/cache/GridCacheIoManager.java    | 74 ++++----------------
 .../GridCachePartitionExchangeManager.java      |  5 +-
 ...ridCacheOptimisticCheckPreparedTxFuture.java |  4 +-
 ...dCachePessimisticCheckCommittedTxFuture.java |  2 +-
 .../distributed/dht/GridDhtCacheAdapter.java    |  4 +-
 .../distributed/dht/GridDhtLockFuture.java      |  5 +-
 .../dht/GridDhtTransactionalCacheAdapter.java   |  9 ++-
 .../distributed/dht/GridDhtTxFinishFuture.java  |  5 +-
 .../cache/distributed/dht/GridDhtTxLocal.java   |  3 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java |  9 ++-
 .../dht/GridPartitionedGetFuture.java           |  2 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  6 +-
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |  2 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |  4 +-
 .../dht/colocated/GridDhtColocatedCache.java    |  4 +-
 .../colocated/GridDhtColocatedLockFuture.java   |  5 +-
 .../dht/preloader/GridDhtForceKeysFuture.java   |  2 +-
 .../preloader/GridDhtPartitionDemandPool.java   |  2 +-
 .../preloader/GridDhtPartitionSupplyPool.java   | 20 ++----
 .../GridDhtPartitionsExchangeFuture.java        |  8 ++-
 .../dht/preloader/GridDhtPreloader.java         |  2 +-
 .../distributed/near/GridNearGetFuture.java     |  2 +-
 .../distributed/near/GridNearLockFuture.java    |  5 +-
 .../near/GridNearTransactionalCache.java        |  4 +-
 .../near/GridNearTxFinishFuture.java            |  3 +-
 .../near/GridNearTxPrepareFuture.java           |  3 +-
 .../query/GridCacheDistributedQueryFuture.java  |  2 +-
 .../query/GridCacheDistributedQueryManager.java |  4 +-
 .../cache/transactions/IgniteTxAdapter.java     |  7 ++
 .../cache/transactions/IgniteTxEx.java          |  6 ++
 .../cache/transactions/IgniteTxHandler.java     |  6 +-
 33 files changed, 99 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 1f624f8..d3fa2c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -62,10 +62,11 @@ import java.util.*;
 import java.util.concurrent.*;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.*;
-import static org.apache.ignite.internal.processors.cache.CacheFlag.*;
 import static org.apache.ignite.cache.CacheMemoryMode.*;
 import static org.apache.ignite.cache.CachePreloadMode.*;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+import static org.apache.ignite.internal.processors.cache.CacheFlag.*;
 
 /**
  * Cache context.
@@ -184,6 +185,9 @@ public class GridCacheContext<K, V> implements Externalizable {
     /** System cache flag. */
     private boolean sys;
 
+    /** IO policy. */
+    private GridIoPolicy plc;
+
     /** Default expiry policy. */
     private ExpiryPolicy expiryPlc;
 
@@ -301,6 +305,8 @@ public class GridCacheContext<K, V> implements Externalizable {
 
         sys = CU.UTILITY_CACHE_NAME.equals(cacheName);
 
+        plc = sys ? UTILITY_CACHE_POOL : SYSTEM_POOL;
+
         Factory<ExpiryPolicy> factory = cacheCfg.getExpiryPolicyFactory();
 
         expiryPlc = factory != null ? factory.create() : null;
@@ -366,6 +372,13 @@ public class GridCacheContext<K, V> implements Externalizable {
     }
 
     /**
+     * @return IO policy for the given cache.
+     */
+    public GridIoPolicy ioPolicy() {
+        return plc;
+    }
+
+    /**
      * @param cache Cache.
      */
     public void cache(GridCacheAdapter<K, V> cache) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
index 837a6e4..ef63038 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
@@ -470,7 +470,7 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V
      */
     private void sendEvictionResponse(UUID nodeId, GridCacheEvictionResponse<K, V> res) {
         try {
-            cctx.io().send(nodeId, res);
+            cctx.io().send(nodeId, res, cctx.ioPolicy());
 
             if (log.isDebugEnabled())
                 log.debug("Sent eviction response [node=" + nodeId + ", localNode=" + cctx.nodeId() +
@@ -1732,7 +1732,7 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V
                     log.debug("Sending eviction request [node=" + nodeId + ", req=" + req + ']');
 
                 try {
-                    cctx.io().send(nodeId, req);
+                    cctx.io().send(nodeId, req, cctx.ioPolicy());
                 }
                 catch (ClusterTopologyException ignored) {
                     // Node left the topology.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 3b7e2a0..359236d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -35,7 +35,6 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
 import static org.apache.ignite.internal.GridTopic.*;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
 
 /**
  * Cache communication manager.
@@ -183,6 +182,7 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V
      * @param cacheMsg Cache message.
      * @param c Handler closure.
      */
+    @SuppressWarnings({"unchecked", "ConstantConditions", "ThrowableResultOfMethodCallIgnored"})
     private void onMessage0(final UUID nodeId, final GridCacheMessage<K, V> cacheMsg,
         final IgniteBiInClosure<UUID, GridCacheMessage<K, V>> c) {
         rw.readLock();
@@ -336,18 +336,7 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V
      * @throws IgniteCheckedException If sending failed.
      * @throws ClusterTopologyException If receiver left.
      */
-    public void send(ClusterNode node, GridCacheMessage<K, V> msg) throws IgniteCheckedException {
-        send(node, msg, SYSTEM_POOL);
-    }
-
-    /**
-     * Sends communication message.
-     *
-     * @param node Node to send the message to.
-     * @param msg Message to send.
-     * @throws IgniteCheckedException If sending failed.
-     * @throws ClusterTopologyException If receiver left.
-     */
+    @SuppressWarnings("unchecked")
     public void send(ClusterNode node, GridCacheMessage<K, V> msg, GridIoPolicy plc) throws IgniteCheckedException {
         assert !node.isLocal();
 
@@ -399,13 +388,14 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V
      *
      * @param nodes Nodes to send to.
      * @param msg Message to send.
+     * @param plc IO policy.
      * @param fallback Callback for failed nodes.
      * @return {@code True} if nodes are empty or message was sent, {@code false} if
      *      all nodes have left topology while sending this message.
      * @throws IgniteCheckedException If send failed.
      */
-    @SuppressWarnings( {"BusyWait"})
-    public boolean safeSend(Collection<? extends ClusterNode> nodes, GridCacheMessage<K, V> msg,
+    @SuppressWarnings({"BusyWait", "unchecked"})
+    public boolean safeSend(Collection<? extends ClusterNode> nodes, GridCacheMessage<K, V> msg, GridIoPolicy plc,
         @Nullable IgnitePredicate<ClusterNode> fallback) throws IgniteCheckedException {
         assert nodes != null;
         assert msg != null;
@@ -445,7 +435,7 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V
                 else
                     msg0 = (GridCacheMessage<K, V>)msg.clone();
 
-                cctx.gridIO().send(nodesView, TOPIC_CACHE, msg0, SYSTEM_POOL);
+                cctx.gridIO().send(nodesView, TOPIC_CACHE, msg0, plc);
 
                 boolean added = false;
 
@@ -527,29 +517,12 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V
      * @param msg Message to send.
      * @throws IgniteCheckedException If sending failed.
      */
-    public void send(UUID nodeId, GridCacheMessage<K, V> msg) throws IgniteCheckedException {
-        ClusterNode n = cctx.discovery().node(nodeId);
-
-        if (n == null)
-            throw new ClusterTopologyException("Failed to send message because node left grid [node=" + n + ", msg=" +
-                msg + ']');
-
-        send(n, msg);
-    }
-
-    /**
-     * Sends communication message.
-     *
-     * @param nodeId ID of node to send the message to.
-     * @param msg Message to send.
-     * @throws IgniteCheckedException If sending failed.
-     */
     public void send(UUID nodeId, GridCacheMessage<K, V> msg, GridIoPolicy plc) throws IgniteCheckedException {
         ClusterNode n = cctx.discovery().node(nodeId);
 
         if (n == null)
-            throw new ClusterTopologyException("Failed to send message because node left grid [node=" + n + ", msg=" +
-                msg + ']');
+            throw new ClusterTopologyException("Failed to send message because node left grid [nodeId=" + nodeId +
+                ", msg=" + msg + ']');
 
         send(n, msg, plc);
     }
@@ -558,10 +531,11 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V
      * @param node Destination node.
      * @param topic Topic to send the message to.
      * @param msg Message to send.
+     * @param plc IO policy.
      * @param timeout Timeout to keep a message on receiving queue.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    public void sendOrderedMessage(ClusterNode node, Object topic, GridCacheMessage<K, V> msg,
+    public void sendOrderedMessage(ClusterNode node, Object topic, GridCacheMessage<K, V> msg, GridIoPolicy plc,
         long timeout) throws IgniteCheckedException {
         onSend(msg, node.id());
 
@@ -571,7 +545,7 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V
             try {
                 cnt++;
 
-                cctx.gridIO().sendOrderedMessage(node, topic, msg, SYSTEM_POOL, timeout, false);
+                cctx.gridIO().sendOrderedMessage(node, topic, msg, plc, timeout, false);
 
                 if (log.isDebugEnabled())
                     log.debug("Sent ordered cache message [topic=" + topic + ", msg=" + msg +
@@ -671,30 +645,6 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V
     }
 
     /**
-     * Removes message handler.
-     *
-     * @param type Type of message.
-     * @param c Handler.
-     */
-    public void removeHandler(Class<?> type, IgniteBiInClosure<UUID, ?> c) {
-        assert type != null;
-        assert c != null;
-
-        boolean res = clsHandlers.remove(type, c);
-
-        if (log != null && log.isDebugEnabled()) {
-            if (res) {
-                log.debug("Removed cache communication handler " +
-                    "[type=" + type + ", handler=" + c + ']');
-            }
-            else {
-                log.debug("Cache communication handler is not registered " +
-                    "[type=" + type + ", handler=" + c + ']');
-            }
-        }
-    }
-
-    /**
      * Adds ordered message handler.
      *
      * @param topic Topic.
@@ -736,7 +686,7 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V
      * @param cacheMsg Message.
      * @throws IgniteCheckedException If failed.
      */
-    @SuppressWarnings("ErrorNotRethrown")
+    @SuppressWarnings({"ErrorNotRethrown", "unchecked"})
     private void unmarshall(UUID nodeId, GridCacheMessage<K, V> cacheMsg) throws IgniteCheckedException {
         if (cctx.localNodeId().equals(nodeId))
             return;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 835aa39..fa7b5d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -44,6 +44,7 @@ import java.util.concurrent.locks.*;
 import static java.util.concurrent.TimeUnit.*;
 import static org.apache.ignite.IgniteSystemProperties.*;
 import static org.apache.ignite.events.IgniteEventType.*;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader.*;
 
 /**
@@ -484,7 +485,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         if (log.isDebugEnabled())
             log.debug("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + ", msg=" + m + ']');
 
-        cctx.io().safeSend(nodes, m, null);
+        cctx.io().safeSend(nodes, m, SYSTEM_POOL, null);
 
         return true;
     }
@@ -508,7 +509,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             log.debug("Sending local partitions [nodeId=" + node.id() + ", msg=" + m + ']');
 
         try {
-            cctx.io().send(node, m);
+            cctx.io().send(node, m, SYSTEM_POOL);
 
             return true;
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
index dbfbb2c..567fd37 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
@@ -157,7 +157,7 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
                         nodeTransactions(id), futureId(), fut.futureId());
 
                     try {
-                        cctx.io().send(id, req);
+                        cctx.io().send(id, req, tx.ioPolicy());
                     }
                     catch (ClusterTopologyException ignored) {
                         fut.onNodeLeft();
@@ -178,7 +178,7 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
                         nodeTransactions(nodeId), futureId(), fut.futureId());
 
                 try {
-                    cctx.io().send(nodeId, req);
+                    cctx.io().send(nodeId, req, tx.ioPolicy());
                 }
                 catch (ClusterTopologyException ignored) {
                     fut.onNodeLeft();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxFuture.java
index 2afa7a4..56cf33b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxFuture.java
@@ -148,7 +148,7 @@ public class GridCachePessimisticCheckCommittedTxFuture<K, V> extends GridCompou
                 add(fut);
 
                 try {
-                    cctx.io().send(rmtNode.id(), req);
+                    cctx.io().send(rmtNode.id(), req, tx.ioPolicy());
                 }
                 catch (ClusterTopologyException ignored) {
                     fut.onNodeLeft();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index c9681c8..de49e57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -642,7 +642,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
                 res.invalidPartitions(fut.invalidPartitions(), ctx.discovery().topologyVersion());
 
                 try {
-                    ctx.io().send(nodeId, res);
+                    ctx.io().send(nodeId, res, ctx.ioPolicy());
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to send get response to node (is node still alive?) [nodeId=" + nodeId +
@@ -717,7 +717,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
 
                     for (Map.Entry<ClusterNode, GridCacheTtlUpdateRequest<K, V>> req : reqMap.entrySet()) {
                         try {
-                            ctx.io().send(req.getKey(), req.getValue());
+                            ctx.io().send(req.getKey(), req.getValue(), ctx.ioPolicy());
                         }
                         catch (IgniteCheckedException e) {
                             log.error("Failed to send TTL update request.", e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index f0da6b4..06bebd6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -41,7 +41,6 @@ import java.util.*;
 import java.util.concurrent.atomic.*;
 
 import static org.apache.ignite.events.IgniteEventType.*;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
 import static org.apache.ignite.internal.processors.dr.GridDrType.*;
 
 /**
@@ -883,7 +882,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
                         if (log.isDebugEnabled())
                             log.debug("Sending DHT lock request to DHT node [node=" + n.id() + ", req=" + req + ']');
 
-                        cctx.io().send(n, req, cctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+                        cctx.io().send(n, req, cctx.ioPolicy());
                     }
                     catch (IgniteCheckedException e) {
                         // Fail the whole thing.
@@ -946,7 +945,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
                             log.debug("Sending DHT lock request to near node [node=" + n.id() +
                                 ", req=" + req + ']');
 
-                        cctx.io().send(n, req, cctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+                        cctx.io().send(n, req, cctx.ioPolicy());
                     }
                     catch (ClusterTopologyException e) {
                         fut.onResult(e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 64c0811..f72f62a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -41,7 +41,6 @@ import java.util.*;
 
 import static org.apache.ignite.transactions.IgniteTxConcurrency.*;
 import static org.apache.ignite.transactions.IgniteTxState.*;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*;
 import static org.apache.ignite.internal.processors.cache.GridCacheUtils.*;
 
@@ -438,7 +437,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
         if (res != null) {
             try {
                 // Reply back to sender.
-                ctx.io().send(nodeId, res, ctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+                ctx.io().send(nodeId, res, ctx.ioPolicy());
             }
             catch (ClusterTopologyException ignored) {
                 U.warn(log, "Failed to send lock reply to remote node because it left grid: " + nodeId);
@@ -1107,7 +1106,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
         try {
             // Don't send reply message to this node or if lock was cancelled.
             if (!nearNode.id().equals(ctx.nodeId()) && !X.hasCause(err, GridDistributedLockCancelledException.class))
-                ctx.io().send(nearNode, res, ctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+                ctx.io().send(nearNode, res, ctx.ioPolicy());
         }
         catch (IgniteCheckedException e) {
             U.error(log, "Failed to send lock reply to originating node (will rollback transaction) [node=" +
@@ -1428,7 +1427,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
 
                 req.completedVersions(committed, rolledback);
 
-                ctx.io().send(n, req);
+                ctx.io().send(n, req, ctx.ioPolicy());
             }
             catch (ClusterTopologyException ignore) {
                 if (log.isDebugEnabled())
@@ -1456,7 +1455,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
 
                     req.completedVersions(committed, rolledback);
 
-                    ctx.io().send(n, req);
+                    ctx.io().send(n, req, ctx.ioPolicy());
                 }
                 catch (ClusterTopologyException ignore) {
                     if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index b5c7927..eff678a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -37,7 +37,6 @@ import java.util.*;
 import java.util.concurrent.atomic.*;
 
 import static org.apache.ignite.transactions.IgniteTxState.*;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
 
 /**
  *
@@ -351,7 +350,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
                 req.writeVersion(tx.writeVersion());
 
             try {
-                cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+                cctx.io().send(n, req, tx.ioPolicy());
 
                 if (sync)
                     res = true;
@@ -416,7 +415,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
                     req.writeVersion(tx.writeVersion());
 
                 try {
-                    cctx.io().send(nearMapping.node(), req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+                    cctx.io().send(nearMapping.node(), req, tx.ioPolicy());
 
                     if (sync)
                         res = true;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 4d62ecf..78e0c45 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -37,7 +37,6 @@ import java.util.*;
 import java.util.concurrent.atomic.*;
 
 import static org.apache.ignite.transactions.IgniteTxState.*;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
 import static org.apache.ignite.internal.processors.cache.GridCacheUtils.*;
 
 /**
@@ -633,7 +632,7 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements
                 nearFinMiniId, err);
 
             try {
-                cctx.io().send(nearNodeId, res, system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+                cctx.io().send(nearNodeId, res, ioPolicy());
             }
             catch (ClusterTopologyException ignored) {
                 if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 947bec4..d918449 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -40,7 +40,6 @@ import java.util.concurrent.atomic.*;
 
 import static org.apache.ignite.transactions.IgniteTxState.*;
 import static org.apache.ignite.events.IgniteEventType.*;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
 
 /**
  *
@@ -287,7 +286,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
                     nearMiniId, tx.xidVersion(), Collections.<Integer>emptySet(), t);
 
                 try {
-                    cctx.io().send(tx.nearNodeId(), res, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+                    cctx.io().send(tx.nearNodeId(), res, tx.ioPolicy());
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to send reply to originating near node (will rollback): " + tx.nearNodeId(), e);
@@ -400,7 +399,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
 
                     res.pending(localDhtPendingVersions(tx.writeEntries(), min));
 
-                    cctx.io().send(tx.nearNodeId(), res, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+                    cctx.io().send(tx.nearNodeId(), res, tx.ioPolicy());
                 }
 
                 return true;
@@ -690,7 +689,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
 
                 //noinspection TryWithIdenticalCatches
                 try {
-                    cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+                    cctx.io().send(n, req, tx.ioPolicy());
                 }
                 catch (ClusterTopologyException e) {
                     fut.onResult(e);
@@ -744,7 +743,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
 
                     //noinspection TryWithIdenticalCatches
                     try {
-                        cctx.io().send(nearMapping.node(), req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+                        cctx.io().send(nearMapping.node(), req, tx.ioPolicy());
                     }
                     catch (ClusterTopologyException e) {
                         fut.onResult(e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 94217f4..2d0ce60 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -393,7 +393,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
                 add(fut); // Append new future.
 
                 try {
-                    cctx.io().send(n, req);
+                    cctx.io().send(n, req, cctx.ioPolicy());
                 }
                 catch (IgniteCheckedException e) {
                     // Fail the whole thing.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 9b7c788..efd952b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -2528,7 +2528,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         try {
             if (res.failedKeys() != null || res.nearEvicted() != null || req.writeSynchronizationMode() == FULL_SYNC)
-                ctx.io().send(nodeId, res);
+                ctx.io().send(nodeId, res, ctx.ioPolicy());
             else {
                 // No failed keys and sync mode is not FULL_SYNC, thus sending deferred response.
                 sendDeferredUpdateResponse(nodeId, req.futureVersion());
@@ -2639,7 +2639,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      */
     private void sendNearUpdateReply(UUID nodeId, GridNearAtomicUpdateResponse<K, V> res) {
         try {
-            ctx.io().send(nodeId, res);
+            ctx.io().send(nodeId, res, ctx.ioPolicy());
         }
         catch (ClusterTopologyException ignored) {
             U.warn(log, "Failed to send near update reply to node because it left grid: " +
@@ -2922,7 +2922,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 ctx.gate().enter();
 
                 try {
-                    ctx.io().send(nodeId, msg);
+                    ctx.io().send(nodeId, msg, ctx.ioPolicy());
                 }
                 finally {
                     ctx.gate().leave();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 3b7e79d..336a9d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -352,7 +352,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
                     if (log.isDebugEnabled())
                         log.debug("Sending DHT atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
 
-                    cctx.io().send(req.nodeId(), req);
+                    cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
                 }
                 catch (ClusterTopologyException ignored) {
                     U.warn(log, "Failed to send update request to backup node because it left grid: " +

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 5cd8d54..e604c9f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -790,7 +790,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
                 if (log.isDebugEnabled())
                     log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
 
-                cctx.io().send(req.nodeId(), req);
+                cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
 
                 if (syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY)
                     onDone(new GridCacheReturn<V>(null, true));
@@ -824,7 +824,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
                     if (log.isDebugEnabled())
                         log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
 
-                    cctx.io().send(req.nodeId(), req);
+                    cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
                 }
                 catch (IgniteCheckedException e) {
                     addFailedKeys(req.keys(), e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 7811713..504e2c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -503,7 +503,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
 
                 if (!F.isEmpty(req.keyBytes()) || !F.isEmpty(req.keys()))
                     // We don't wait for reply to this message.
-                    ctx.io().send(n, req);
+                    ctx.io().send(n, req, ctx.ioPolicy());
             }
         }
         catch (IgniteCheckedException ex) {
@@ -585,7 +585,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
                     req.completedVersions(committed, rolledback);
 
                     // We don't wait for reply to this message.
-                    ctx.io().send(n, req);
+                    ctx.io().send(n, req, ctx.ioPolicy());
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index e0edcad..c76fb47 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -44,7 +44,6 @@ import java.util.*;
 import java.util.concurrent.atomic.*;
 
 import static org.apache.ignite.events.IgniteEventType.*;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
 
 /**
  * Colocated cache lock future.
@@ -870,7 +869,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
                     if (log.isDebugEnabled())
                         log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']');
 
-                    cctx.io().send(node, req, cctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+                    cctx.io().send(node, req, cctx.ioPolicy());
                 }
                 catch (ClusterTopologyException ex) {
                     assert fut != null;
@@ -885,7 +884,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
                             if (log.isDebugEnabled())
                                 log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']');
 
-                            cctx.io().send(node, req, cctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+                            cctx.io().send(node, req, cctx.ioPolicy());
                         }
                         catch (ClusterTopologyException ex) {
                             assert fut != null;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
index 16a56e2..cd14126 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
@@ -260,7 +260,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
                             log.debug("Sending force key request [cacheName=" + cctx.name() + "node=" + n.id() +
                                 ", req=" + req + ']');
 
-                        cctx.io().send(n, req);
+                        cctx.io().send(n, req, cctx.ioPolicy());
                     }
                     catch (IgniteCheckedException e) {
                         // Fail the whole thing.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
index fde727e..f9cc203 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
@@ -613,7 +613,7 @@ public class GridDhtPartitionDemandPool<K, V> {
                         log.debug("Sending demand message [node=" + node.id() + ", demand=" + d + ']');
 
                     // Send demand message.
-                    cctx.io().send(node, d);
+                    cctx.io().send(node, d, cctx.ioPolicy());
 
                     // While.
                     // =====

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
index 865a16e..52a4ed5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
@@ -245,9 +245,6 @@ class GridDhtPartitionSupplyPool<K, V> {
 
             boolean ack = false;
 
-            // If demander node left grid.
-            boolean nodeLeft = false;
-
             boolean convertPortable = cctx.portableEnabled() && cctx.offheapTiered();
 
             try {
@@ -300,11 +297,8 @@ class GridDhtPartitionSupplyPool<K, V> {
                             if (s.messageSize() >= cctx.config().getPreloadBatchSize()) {
                                 ack = true;
 
-                                if (!reply(node, d, s)) {
-                                    nodeLeft = true;
-
+                                if (!reply(node, d, s))
                                     return;
-                                }
 
                                 // Throttle preloading.
                                 if (preloadThrottle > 0)
@@ -355,11 +349,8 @@ class GridDhtPartitionSupplyPool<K, V> {
                                         if (s.messageSize() >= cctx.config().getPreloadBatchSize()) {
                                             ack = true;
 
-                                            if (!reply(node, d, s)) {
-                                                nodeLeft = true;
-
+                                            if (!reply(node, d, s))
                                                 return;
-                                            }
 
                                             // Throttle preloading.
                                             if (preloadThrottle > 0)
@@ -453,11 +444,8 @@ class GridDhtPartitionSupplyPool<K, V> {
                                 if (s.messageSize() >= cctx.config().getPreloadBatchSize()) {
                                     ack = true;
 
-                                    if (!reply(node, d, s)) {
-                                        nodeLeft = true;
-
+                                    if (!reply(node, d, s))
                                         return;
-                                    }
 
                                     s = new GridDhtPartitionSupplyMessage<>(d.workerId(), d.updateSequence(),
                                         cctx.cacheId());
@@ -510,7 +498,7 @@ class GridDhtPartitionSupplyPool<K, V> {
                 if (log.isDebugEnabled())
                     log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
 
-                cctx.io().sendOrderedMessage(n, d.topic(), s, d.timeout());
+                cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
 
                 return true;
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 219737f..c49e439 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -41,6 +41,8 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 import java.util.concurrent.locks.*;
 
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+
 /**
  * Future for exchanging partition maps.
  */
@@ -558,7 +560,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon
         if (log.isDebugEnabled())
             log.debug("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", msg=" + m + ']');
 
-        cctx.io().send(node, m);
+        cctx.io().send(node, m, SYSTEM_POOL);
     }
 
     /**
@@ -583,7 +585,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon
             log.debug("Sending full partition map [nodeIds=" + F.viewReadOnly(nodes, F.node2id()) +
                 ", exchId=" + exchId + ", msg=" + m + ']');
 
-        cctx.io().safeSend(nodes, m, null);
+        cctx.io().safeSend(nodes, m, SYSTEM_POOL, null);
     }
 
     /**
@@ -997,7 +999,7 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon
             if (!remaining.isEmpty()) {
                 try {
                     cctx.io().safeSend(cctx.discovery().nodes(remaining),
-                        new GridDhtPartitionsSingleRequest<K, V>(exchId), null);
+                        new GridDhtPartitionsSingleRequest<K, V>(exchId), SYSTEM_POOL, null);
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to request partitions from nodes [exchangeId=" + exchId +

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index d5e9714..97e0ce8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -379,7 +379,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
             if (log.isDebugEnabled())
                 log.debug("Sending force key response [node=" + node.id() + ", res=" + res + ']');
 
-            cctx.io().send(node, res);
+            cctx.io().send(node, res, cctx.ioPolicy());
         }
         catch (ClusterTopologyException ignore) {
             if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index bd2f1d8..25b1ce6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -399,7 +399,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
                 add(fut); // Append new future.
 
                 try {
-                    cctx.io().send(n, req);
+                    cctx.io().send(n, req, cctx.ioPolicy());
                 }
                 catch (IgniteCheckedException e) {
                     // Fail the whole thing.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index d1b56e5..5e370aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -43,7 +43,6 @@ import java.util.*;
 import java.util.concurrent.atomic.*;
 
 import static org.apache.ignite.events.IgniteEventType.*;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
 
 /**
  * Cache lock future.
@@ -1120,7 +1119,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
                     if (log.isDebugEnabled())
                         log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']');
 
-                    cctx.io().send(node, req, cctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+                    cctx.io().send(node, req, cctx.ioPolicy());
                 }
                 catch (ClusterTopologyException ex) {
                     assert fut != null;
@@ -1135,7 +1134,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
                             if (log.isDebugEnabled())
                                 log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']');
 
-                            cctx.io().send(node, req, cctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+                            cctx.io().send(node, req, cctx.ioPolicy());
                         }
                         catch (ClusterTopologyException ex) {
                             assert fut != null;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index d790ac3..268e6b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -577,7 +577,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
                     dht.removeLocks(ctx.nodeId(), req.version(), locKeys, true);
                 else if (!F.isEmpty(req.keyBytes()) || !F.isEmpty(req.keys()))
                     // We don't wait for reply to this message.
-                    ctx.io().send(n, req);
+                    ctx.io().send(n, req, ctx.ioPolicy());
             }
         }
         catch (IgniteCheckedException ex) {
@@ -680,7 +680,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
                     req.completedVersions(committed, rolledback);
 
                     // We don't wait for reply to this message.
-                    ctx.io().send(n, req);
+                    ctx.io().send(n, req, ctx.ioPolicy());
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index ae0efd5..c714db2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -38,7 +38,6 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
 import static org.apache.ignite.transactions.IgniteTxState.*;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*;
 
 /**
@@ -386,7 +385,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                 cctx.tm().beforeFinishRemote(n.id(), tx.threadId());
 
             try {
-                cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+                cctx.io().send(n, req, tx.ioPolicy());
 
                 // If we don't wait for result, then mark future as done.
                 if (!isSync() && !m.explicitLock())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
index 8df91e0..f658715 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
@@ -41,7 +41,6 @@ import java.util.*;
 import java.util.concurrent.atomic.*;
 
 import static org.apache.ignite.transactions.IgniteTxState.*;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*;
 
 /**
@@ -652,7 +651,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
             add(fut); // Append new future.
 
             try {
-                cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+                cctx.io().send(n, req, tx.ioPolicy());
             }
             catch (IgniteCheckedException e) {
                 // Fail the whole thing.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
index b80b2b6..83b1053 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
@@ -112,7 +112,7 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
             });
 
             if (!nodes.isEmpty()) {
-                cctx.io().safeSend(nodes, req,
+                cctx.io().safeSend(nodes, req, cctx.ioPolicy(),
                     new P1<ClusterNode>() {
                         @Override public boolean apply(ClusterNode node) {
                             onNodeLeft(node.id());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index cb7e9eb..431af1f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -273,6 +273,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
                     node,
                     topic,
                     res,
+                    cctx.ioPolicy(),
                     timeout > 0 ? timeout : Long.MAX_VALUE);
 
                 return true;
@@ -417,6 +418,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
     }
 
     /** {@inheritDoc} */
+    @SuppressWarnings("ConstantConditions")
     @Override protected boolean onFieldsPageReady(boolean loc, GridCacheQueryInfo qryInfo,
         @Nullable List<GridQueryFieldMetadata> metadata,
         @Nullable Collection<?> entities,
@@ -687,7 +689,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
         // For example, a remote reducer has a state, we should not serialize and then send
         // the reducer changed by the local node.
         if (!F.isEmpty(rmtNodes)) {
-            cctx.io().safeSend(rmtNodes, req, new P1<ClusterNode>() {
+            cctx.io().safeSend(rmtNodes, req, cctx.ioPolicy(), new P1<ClusterNode>() {
                 @Override public boolean apply(ClusterNode node) {
                     fut.onNodeLeft(node.id());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 10843ec..9400636 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.transactions;
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.util.*;
@@ -40,6 +41,7 @@ import java.util.concurrent.atomic.*;
 import java.util.concurrent.locks.*;
 
 import static org.apache.ignite.events.IgniteEventType.*;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
 import static org.apache.ignite.internal.processors.cache.GridCacheUtils.*;
 import static org.apache.ignite.transactions.IgniteTxConcurrency.*;
 import static org.apache.ignite.transactions.IgniteTxIsolation.*;
@@ -419,6 +421,11 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter
     }
 
     /** {@inheritDoc} */
+    @Override public GridIoPolicy ioPolicy() {
+        return sys ? UTILITY_CACHE_POOL : SYSTEM_POOL;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean storeUsed() {
         return storeEnabled() && store() != null;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEx.java
index 63e4786..da40532 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEx.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.transactions;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.lang.*;
@@ -78,6 +79,11 @@ public interface IgniteTxEx<K, V> extends IgniteTx, GridTimeoutObject {
     public boolean system();
 
     /**
+     * @return Pool where message for the given transaction must be processed.
+     */
+    public GridIoPolicy ioPolicy();
+
+    /**
      * @return Last recorded topology version.
      */
     public long topologyVersion();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb74c5ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index fb94cd2..8c8da1a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -484,7 +484,7 @@ public class IgniteTxHandler<K, V> {
                 req.miniId(), new IgniteCheckedException("Transaction has been already completed."));
 
             try {
-                ctx.io().send(nodeId, res, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+                ctx.io().send(nodeId, res, tx.ioPolicy());
             }
             catch (Throwable e) {
                 // Double-check.
@@ -1406,7 +1406,7 @@ public class IgniteTxHandler<K, V> {
             if (log.isDebugEnabled())
                 log.debug("Sending check prepared transaction response [nodeId=" + nodeId + ", res=" + res + ']');
 
-            ctx.io().send(nodeId, res);
+            ctx.io().send(nodeId, res, SYSTEM_POOL);
         }
         catch (ClusterTopologyException ignored) {
             if (log.isDebugEnabled())
@@ -1506,7 +1506,7 @@ public class IgniteTxHandler<K, V> {
             if (log.isDebugEnabled())
                 log.debug("Sending check committed transaction response [nodeId=" + nodeId + ", res=" + res + ']');
 
-            ctx.io().send(nodeId, res);
+            ctx.io().send(nodeId, res, SYSTEM_POOL);
         }
         catch (ClusterTopologyException ignored) {
             if (log.isDebugEnabled())


Mime
View raw message