ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: Fixed wrong 'send' method usage in GridIoManager.
Date Fri, 03 Mar 2017 13:39:31 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-2.0 50f87419e -> 93e199621


Fixed wrong 'send' method usage in GridIoManager.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/93e19962
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/93e19962
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/93e19962

Branch: refs/heads/ignite-2.0
Commit: 93e19962114194072151840198f04f3406be068a
Parents: 50f8741
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Mar 3 16:39:22 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Mar 3 16:39:22 2017 +0300

----------------------------------------------------------------------
 .../ignite/internal/GridJobSiblingImpl.java     |   4 +-
 .../internal/managers/GridManagerAdapter.java   |   2 +-
 .../checkpoint/GridCheckpointManager.java       |   2 +-
 .../managers/communication/GridIoManager.java   | 206 +++++++------------
 .../deployment/GridDeploymentCommunication.java |   6 +-
 .../eventstorage/GridEventStorageManager.java   |   6 +-
 .../processors/cache/GridCacheIoManager.java    |   8 +-
 .../cache/transactions/IgniteTxManager.java     |   6 +-
 .../clock/GridClockSyncProcessor.java           |   2 +-
 .../continuous/GridContinuousProcessor.java     |   2 +-
 .../datastreamer/DataStreamProcessor.java       |   2 +-
 .../datastreamer/DataStreamerImpl.java          |   2 +-
 .../internal/processors/igfs/IgfsContext.java   |   8 +-
 .../processors/job/GridJobProcessor.java        |   4 +-
 .../internal/processors/job/GridJobWorker.java  |   2 +-
 .../marshaller/ClientRequestFuture.java         |   2 +-
 .../GridMarshallerMappingProcessor.java         |   2 +-
 .../handlers/task/GridTaskCommandHandler.java   |   4 +-
 .../processors/task/GridTaskProcessor.java      |   2 +-
 .../processors/task/GridTaskWorker.java         |   4 +-
 .../GridCommunicationSendMessageSelfTest.java   |   2 +-
 .../communication/GridIoManagerSelfTest.java    |  28 +--
 .../nio/IgniteExceptionInNioWorkerSelfTest.java |   2 +-
 .../communication/GridIoManagerBenchmark.java   |   4 +-
 .../communication/GridIoManagerBenchmark0.java  |  14 +-
 .../communication/GridCacheMessageSelfTest.java |   2 +-
 .../hadoop/shuffle/HadoopShuffle.java           |   3 +-
 .../processors/query/h2/IgniteH2Indexing.java   |   2 +-
 .../query/h2/twostep/GridMapQueryExecutor.java  |   6 +-
 .../h2/twostep/GridReduceQueryExecutor.java     |   2 +-
 30 files changed, 140 insertions(+), 201 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java
index 2d95f85..79ac416 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java
@@ -150,7 +150,7 @@ public class GridJobSiblingImpl implements ComputeJobSibling, Externalizable {
 
             if (!nodes.isEmpty()) {
                 try {
-                    ctx.io().send(nodes, TOPIC_JOB_CANCEL, new GridJobCancelRequest(sesId, jobId), SYSTEM_POOL);
+                    ctx.io().sendToGridTopic(nodes, TOPIC_JOB_CANCEL, new GridJobCancelRequest(sesId, jobId), SYSTEM_POOL);
                 }
                 catch (IgniteCheckedException e) {
                     throw U.convertException(e);
@@ -169,7 +169,7 @@ public class GridJobSiblingImpl implements ComputeJobSibling, Externalizable {
                 ctx.job().cancelJob(ses.getId(), jobId, false);
             else {
                 try {
-                    ctx.io().send(node, TOPIC_JOB_CANCEL, new GridJobCancelRequest(ses.getId(), jobId), SYSTEM_POOL);
+                    ctx.io().sendToGridTopic(node, TOPIC_JOB_CANCEL, new GridJobCancelRequest(ses.getId(), jobId), SYSTEM_POOL);
                 }
                 catch (IgniteCheckedException e) {
                     // Avoid stack trace for left nodes.

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index e864916..d993376 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -391,7 +391,7 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
 
                         try {
                             if (msg instanceof Message)
-                                ctx.io().send(node, topic, (Message)msg, SYSTEM_POOL);
+                                ctx.io().sendToCustomTopic(node, topic, (Message)msg, SYSTEM_POOL);
                             else
                                 ctx.io().sendUserMessage(Collections.singletonList(node), msg, topic, false, 0, false);
                         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
index 9124caf..8ce8b0b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
@@ -239,7 +239,7 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
                             ClusterNode node = ctx.discovery().node(ses.getTaskNodeId());
 
                             if (node != null)
-                                ctx.io().send(
+                                ctx.io().sendToGridTopic(
                                     node,
                                     TOPIC_CHECKPOINT,
                                     new GridCheckpointRequest(ses.getId(), key, ses.getCheckpointSpi()),

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 38b5441..5e91ea9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -191,6 +191,13 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     /** */
     private final AtomicLong ioTestId = new AtomicLong();
 
+    /** No-op runnable. */
+    private static final IgniteRunnable NOOP = new IgniteRunnable() {
+        @Override public void run() {
+            // No-op.
+        }
+    };
+
     /**
      * @param ctx Grid kernal context.
      */
@@ -328,7 +335,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                     res.flags(msg0.flags());
 
                     try {
-                        send(node, GridTopic.TOPIC_IO_TEST, res, GridIoPolicy.SYSTEM_POOL);
+                        sendToGridTopic(node, GridTopic.TOPIC_IO_TEST, res, GridIoPolicy.SYSTEM_POOL);
                     }
                     catch (IgniteCheckedException e) {
                         U.error(log, "Failed to send IO test response [msg=" + msg0 + "]", e);
@@ -367,7 +374,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             ClusterNode node = nodes.get(i);
 
             try {
-                send(node, GridTopic.TOPIC_IO_TEST, msg, GridIoPolicy.SYSTEM_POOL);
+                sendToGridTopic(node, GridTopic.TOPIC_IO_TEST, msg, GridIoPolicy.SYSTEM_POOL);
             }
             catch (IgniteCheckedException e) {
                 ioTestMap().remove(msg.id());
@@ -397,7 +404,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         ioTestMap().put(id, fut);
 
         try {
-            send(node, GridTopic.TOPIC_IO_TEST, msg, GridIoPolicy.SYSTEM_POOL);
+            sendToGridTopic(node, GridTopic.TOPIC_IO_TEST, msg, GridIoPolicy.SYSTEM_POOL);
         }
         catch (IgniteCheckedException e) {
             ioTestMap().remove(msg.id());
@@ -791,8 +798,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                 finally {
                     threadProcessingMessage(false);
 
-                    if (msgC != null)
-                        msgC.run();
+                    msgC.run();
                 }
             }
 
@@ -915,46 +921,46 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * Remove listener if it matches expected value.
      *
      * @param topic Topic.
-     * @param expected Listener.
+     * @param exp Listener.
      * @return Result.
      */
-    private boolean listenerRemove0(Object topic, GridMessageListener expected) {
+    private boolean listenerRemove0(Object topic, GridMessageListener exp) {
         if (topic instanceof GridTopic) {
             synchronized (sysLsnrsMux) {
-                return systemListenerChange(topic, expected, null);
+                return systemListenerChange(topic, exp, null);
             }
         }
         else
-            return lsnrMap.remove(topic, expected);
+            return lsnrMap.remove(topic, exp);
     }
 
     /**
      * Replace listener.
      *
      * @param topic Topic.
-     * @param expected Old value.
+     * @param exp Old value.
      * @param newVal New value.
      * @return Result.
      */
-    private boolean listenerReplace0(Object topic, GridMessageListener expected, GridMessageListener newVal) {
+    private boolean listenerReplace0(Object topic, GridMessageListener exp, GridMessageListener newVal) {
         if (topic instanceof GridTopic) {
             synchronized (sysLsnrsMux) {
-                return systemListenerChange(topic, expected, newVal);
+                return systemListenerChange(topic, exp, newVal);
             }
         }
         else
-            return lsnrMap.replace(topic, expected, newVal);
+            return lsnrMap.replace(topic, exp, newVal);
     }
 
     /**
      * Change system listener.
      *
      * @param topic Topic.
-     * @param expected Expected value.
+     * @param exp Expected value.
      * @param newVal New value.
      * @return Result.
      */
-    private boolean systemListenerChange(Object topic, GridMessageListener expected, GridMessageListener newVal) {
+    private boolean systemListenerChange(Object topic, GridMessageListener exp, GridMessageListener newVal) {
         assert Thread.holdsLock(sysLsnrsMux);
         assert topic instanceof GridTopic;
 
@@ -962,7 +968,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
         GridMessageListener old = sysLsnrs[idx];
 
-        if (old != null && old.equals(expected)) {
+        if (old != null && old.equals(exp)) {
             changeSystemListener(idx, newVal);
 
             return true;
@@ -1263,6 +1269,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         assert node != null;
         assert topic != null;
         assert msg != null;
+        assert !async || msg instanceof GridIoUserMessage : msg; // Async execution was added only for IgniteMessaging.
+        assert topicOrd >= 0 || !(topic instanceof GridTopic) : msg;
 
         GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout);
 
@@ -1276,11 +1284,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
             if (ordered)
                 processOrderedMessage(locNodeId, ioMsg, plc, null);
-            else if (async) {
-                assert msg instanceof GridIoUserMessage : ioMsg; // Async execution was added only for IgniteMessaging.
-
-                processRegularMessage(locNodeId, ioMsg, plc, null);
-            }
+            else if (async)
+                processRegularMessage(locNodeId, ioMsg, plc, NOOP);
             else
                 processRegularMessage0(ioMsg, locNodeId);
 
@@ -1313,14 +1318,14 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param plc Type of processing.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    public void send(UUID nodeId, Object topic, Message msg, byte plc)
+    public void sendToCustomTopic(UUID nodeId, Object topic, Message msg, byte plc)
         throws IgniteCheckedException {
         ClusterNode node = ctx.discovery().node(nodeId);
 
         if (node == null)
             throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
 
-        send(node, topic, msg, plc);
+        sendToCustomTopic(node, topic, msg, plc);
     }
 
     /**
@@ -1331,7 +1336,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
     @SuppressWarnings("TypeMayBeWeakened")
-    public void send(UUID nodeId, GridTopic topic, Message msg, byte plc)
+    public void sendToGridTopic(UUID nodeId, GridTopic topic, Message msg, byte plc)
         throws IgniteCheckedException {
         ClusterNode node = ctx.discovery().node(nodeId);
 
@@ -1348,7 +1353,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param plc Type of processing.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    public void send(ClusterNode node, Object topic, Message msg, byte plc)
+    public void sendToCustomTopic(ClusterNode node, Object topic, Message msg, byte plc)
         throws IgniteCheckedException {
         send(node, topic, -1, msg, plc, false, 0, false, null, false);
     }
@@ -1358,12 +1363,11 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param topic Topic to send the message to.
      * @param msg Message to send.
      * @param plc Type of processing.
-     * @param async Async flag.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    public void send(ClusterNode node, GridTopic topic, Message msg, byte plc, boolean async)
+    public void sendToGridTopic(ClusterNode node, GridTopic topic, Message msg, byte plc)
         throws IgniteCheckedException {
-        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, async);
+        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false);
     }
 
     /**
@@ -1374,7 +1378,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param plc Type of processing.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    public void send(ClusterNode node, Object topic, int topicOrd, Message msg, byte plc)
+    public void sendGeneric(ClusterNode node, Object topic, int topicOrd, Message msg, byte plc)
         throws IgniteCheckedException {
         send(node, topic, topicOrd, msg, plc, false, 0, false, null, false);
     }
@@ -1402,33 +1406,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     }
 
     /**
-     * @param nodeId Destination node.
-     * @param topic Topic to send the message to.
-     * @param msg Message to send.
-     * @param plc Type of processing.
-     * @param timeout Timeout to keep a message on receiving queue.
-     * @param skipOnTimeout Whether message can be skipped on timeout.
-     * @throws IgniteCheckedException Thrown in case of any errors.
-     */
-    public void sendOrderedMessage(
-        UUID nodeId,
-        Object topic,
-        Message msg,
-        byte plc,
-        long timeout,
-        boolean skipOnTimeout
-    ) throws IgniteCheckedException {
-        assert timeout > 0 || skipOnTimeout;
-
-        ClusterNode node = ctx.discovery().node(nodeId);
-
-        if (node == null)
-            throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
-
-        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null, false);
-    }
-
-    /**
      * @param node Destination nodes.
      * @param topic Topic to send the message to.
      * @param msg Message to send.
@@ -1436,8 +1413,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param ackC Ack closure.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    public void send(ClusterNode node, GridTopic topic, Message msg, byte plc,
-        IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
+    public void sendToGridTopic(ClusterNode node,
+        GridTopic topic,
+        Message msg,
+        byte plc,
+        IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException
+    {
         send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackC, false);
     }
 
@@ -1450,9 +1431,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param skipOnTimeout Whether message can be skipped on timeout.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    public void sendOrderedMessage(
+    void sendOrderedMessageToGridTopic(
         Collection<? extends ClusterNode> nodes,
-        Object topic,
+        GridTopic topic,
         Message msg,
         byte plc,
         long timeout,
@@ -1461,36 +1442,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         throws IgniteCheckedException {
         assert timeout > 0 || skipOnTimeout;
 
-        send(nodes, topic, -1, msg, plc, true, timeout, skipOnTimeout);
-    }
-
-    /**
-     * @param node Destination nodes.
-     * @param topic Topic to send the message to.
-     * @param msg Message to send.
-     * @param plc Type of processing.
-     * @param ackC Ack closure.
-     * @throws IgniteCheckedException Thrown in case of any errors.
-     */
-    public void send(ClusterNode node, Object topic, Message msg, byte plc, IgniteInClosure<IgniteException> ackC)
-        throws IgniteCheckedException {
-        send(node, topic, -1, msg, plc, false, 0, false, ackC, false);
-    }
-
-    /**
-     * @param nodes Destination node.
-     * @param topic Topic to send the message to.
-     * @param msg Message to send.
-     * @param plc Type of processing.
-     * @throws IgniteCheckedException Thrown in case of any errors.
-     */
-    public void send(
-        Collection<? extends ClusterNode> nodes,
-        Object topic,
-        Message msg,
-        byte plc
-    ) throws IgniteCheckedException {
-        send(nodes, topic, -1, msg, plc, false, 0, false);
+        send(nodes, topic, topic.ordinal(), msg, plc, true, timeout, skipOnTimeout);
     }
 
     /**
@@ -1500,7 +1452,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param plc Type of processing.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    public void send(
+    public void sendToGridTopic(
         Collection<? extends ClusterNode> nodes,
         GridTopic topic,
         Message msg,
@@ -1540,7 +1492,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param msg Message to send.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    public void sendUserMessage(Collection<? extends ClusterNode> nodes, Object msg) throws IgniteCheckedException {
+    void sendUserMessage(Collection<? extends ClusterNode> nodes, Object msg) throws IgniteCheckedException {
         sendUserMessage(nodes, msg, null, false, 0, false);
     }
 
@@ -1556,8 +1508,13 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
     @SuppressWarnings("ConstantConditions")
-    public void sendUserMessage(Collection<? extends ClusterNode> nodes, Object msg,
-        @Nullable Object topic, boolean ordered, long timeout, boolean async) throws IgniteCheckedException {
+    public void sendUserMessage(Collection<? extends ClusterNode> nodes,
+        Object msg,
+        @Nullable Object topic,
+        boolean ordered,
+        long timeout,
+        boolean async) throws IgniteCheckedException
+    {
         boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(locNodeId);
 
         byte[] serMsg = null;
@@ -1600,22 +1557,42 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             dep != null ? dep.participants() : null);
 
         if (ordered)
-            sendOrderedMessage(nodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, timeout, true);
-        else if (loc)
-            send(F.first(nodes), TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, async);
+            sendOrderedMessageToGridTopic(nodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, timeout, true);
+        else if (loc) {
+            send(F.first(nodes),
+                TOPIC_COMM_USER,
+                TOPIC_COMM_USER.ordinal(),
+                ioMsg,
+                PUBLIC_POOL,
+                false,
+                0,
+                false,
+                null,
+                async);
+        }
         else {
             ClusterNode locNode = F.find(nodes, null, F.localNode(locNodeId));
 
             Collection<? extends ClusterNode> rmtNodes = F.view(nodes, F.remoteNodes(locNodeId));
 
             if (!rmtNodes.isEmpty())
-                send(rmtNodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL);
+                sendToGridTopic(rmtNodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL);
 
             // Will call local listeners in current thread synchronously or through pool,
             // depending async flag, so must go the last
             // to allow remote nodes execute the requested operation in parallel.
-            if (locNode != null)
-                send(locNode, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, async);
+            if (locNode != null) {
+                send(locNode,
+                    TOPIC_COMM_USER,
+                    TOPIC_COMM_USER.ordinal(),
+                    ioMsg,
+                    PUBLIC_POOL,
+                    false,
+                    0,
+                    false,
+                    null,
+                    async);
+            }
         }
     }
 
@@ -1657,35 +1634,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     }
 
     /**
-     * @param nodeId Destination node.
-     * @param topic Topic to send the message to.
-     * @param msg Message to send.
-     * @param plc Type of processing.
-     * @param timeout Timeout to keep a message on receiving queue.
-     * @param skipOnTimeout Whether message can be skipped on timeout.
-     * @param ackC Ack closure.
-     * @throws IgniteCheckedException Thrown in case of any errors.
-     */
-    public void sendOrderedMessage(
-        UUID nodeId,
-        Object topic,
-        Message msg,
-        byte plc,
-        long timeout,
-        boolean skipOnTimeout,
-        IgniteInClosure<IgniteException> ackC
-    ) throws IgniteCheckedException {
-        assert timeout > 0 || skipOnTimeout;
-
-        ClusterNode node = ctx.discovery().node(nodeId);
-
-        if (node == null)
-            throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
-
-        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC, false);
-    }
-
-    /**
      * @param nodes Destination nodes.
      * @param topic Topic to send the message to.
      * @param topicOrd Topic ordinal value.

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
index a571ae4..ffbde37 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
@@ -294,7 +294,7 @@ class GridDeploymentCommunication {
 
         if (node != null) {
             try {
-                ctx.io().send(node, topic, res, GridIoPolicy.P2P_POOL);
+                ctx.io().sendToCustomTopic(node, topic, res, GridIoPolicy.P2P_POOL);
 
                 if (log.isDebugEnabled())
                     log.debug("Sent peer class loading response [node=" + node.id() + ", res=" + res + ']');
@@ -324,7 +324,7 @@ class GridDeploymentCommunication {
         Message req = new GridDeploymentRequest(null, null, rsrcName, true);
 
         if (!rmtNodes.isEmpty()) {
-            ctx.io().send(
+            ctx.io().sendToGridTopic(
                 rmtNodes,
                 TOPIC_CLASSLOAD,
                 req,
@@ -445,7 +445,7 @@ class GridDeploymentCommunication {
             if (req.responseTopic() != null && !ctx.localNodeId().equals(dstNode.id()))
                 req.responseTopicBytes(U.marshal(marsh, req.responseTopic()));
 
-            ctx.io().send(dstNode, TOPIC_CLASSLOAD, req, GridIoPolicy.P2P_POOL);
+            ctx.io().sendToGridTopic(dstNode, TOPIC_CLASSLOAD, req, GridIoPolicy.P2P_POOL);
 
             if (log.isDebugEnabled())
                 log.debug("Sent peer class loading request [node=" + dstNode.id() + ", req=" + req + ']');

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
index b5d5ee2..656c739 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
@@ -1040,12 +1040,12 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
         Collection<? extends ClusterNode> rmtNodes = F.view(nodes, F.remoteNodes(ctx.localNodeId()));
 
         if (locNode != null)
-            ctx.io().send(locNode, topic, msg, plc);
+            ctx.io().sendToGridTopic(locNode, topic, msg, plc);
 
         if (!rmtNodes.isEmpty()) {
             msg.responseTopicBytes(U.marshal(marsh, msg.responseTopic()));
 
-            ctx.io().send(rmtNodes, topic, msg, plc);
+            ctx.io().sendToGridTopic(rmtNodes, topic, msg, plc);
         }
     }
 
@@ -1164,7 +1164,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
                         res.exceptionBytes(U.marshal(marsh, res.exception()));
                     }
 
-                    ctx.io().send(node, req.responseTopic(), res, PUBLIC_POOL);
+                    ctx.io().sendToCustomTopic(node, req.responseTopic(), res, PUBLIC_POOL);
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to send event query response to node [node=" + nodeId + ", res=" +

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index d20310b..50f58cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -908,7 +908,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
             try {
                 cnt++;
 
-                cctx.gridIO().send(node, TOPIC_CACHE, msg, plc);
+                cctx.gridIO().sendToGridTopic(node, TOPIC_CACHE, msg, plc);
 
                 return;
             }
@@ -969,7 +969,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                     }
                 });
 
-                cctx.gridIO().send(nodesView, TOPIC_CACHE, msg, plc);
+                cctx.gridIO().sendToGridTopic(nodesView, TOPIC_CACHE, msg, plc);
 
                 boolean added = false;
 
@@ -1116,7 +1116,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
      * @param plc IO policy.
      * @throws IgniteCheckedException If send failed.
      */
-    public void sendNoRetry(ClusterNode node,
+    void sendNoRetry(ClusterNode node,
         GridCacheMessage msg,
         byte plc)
         throws IgniteCheckedException {
@@ -1127,7 +1127,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
             return;
 
         try {
-            cctx.gridIO().send(node, TOPIC_CACHE, msg, plc);
+            cctx.gridIO().sendToGridTopic(node, TOPIC_CACHE, msg, plc);
 
             if (log.isDebugEnabled())
                 log.debug("Sent cache message [msg=" + msg + ", node=" + U.toShortString(node) + ']');

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index a1a18fe..f4a5629 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -2065,7 +2065,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                 if (!cctx.localNodeId().equals(nodeId))
                     req.prepareMarshal(cctx);
 
-                cctx.gridIO().send(node, TOPIC_TX, req, SYSTEM_POOL);
+                cctx.gridIO().sendToGridTopic(node, TOPIC_TX, req, SYSTEM_POOL);
             }
             catch (IgniteCheckedException e) {
                 if (e instanceof ClusterTopologyCheckedException) {
@@ -2508,7 +2508,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                         if (!cctx.localNodeId().equals(nodeId))
                             res.prepareMarshal(cctx);
 
-                        cctx.gridIO().send(nodeId, TOPIC_TX, res, SYSTEM_POOL);
+                        cctx.gridIO().sendToGridTopic(nodeId, TOPIC_TX, res, SYSTEM_POOL);
                     }
                     catch (IgniteCheckedException e) {
                         U.error(log, "Failed to send response to node [node=" + nodeId + ", res=" + res + ']', e);
@@ -2545,7 +2545,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                     res.futureId(req.futureId());
 
                     try {
-                        cctx.gridIO().send(nodeId, TOPIC_TX, res, SYSTEM_POOL);
+                        cctx.gridIO().sendToGridTopic(nodeId, TOPIC_TX, res, SYSTEM_POOL);
                     }
                     catch (IgniteCheckedException e) {
                         U.error(log, "Failed to send response to node (is node still alive?) [nodeId=" + nodeId +

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
index 257d0d9..d644261 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
@@ -320,7 +320,7 @@ public class GridClockSyncProcessor extends GridProcessorAdapter {
                     snapshot.version(), snapshot.deltas());
 
                 try {
-                    ctx.io().send(n, TOPIC_TIME_SYNC, msg, SYSTEM_POOL);
+                    ctx.io().sendToGridTopic(n, TOPIC_TIME_SYNC, msg, SYSTEM_POOL);
                 }
                 catch (IgniteCheckedException e) {
                     if (ctx.discovery().pingNodeNoError(n.id()))

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 575bc69..55f65c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -1397,7 +1397,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                             ackC);
                     }
                     else
-                        ctx.io().send(node, TOPIC_CONTINUOUS, msg, SYSTEM_POOL, ackC);
+                        ctx.io().sendToGridTopic(node, TOPIC_CONTINUOUS, msg, SYSTEM_POOL, ackC);
 
                     break;
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index b6400e8..74d5f4a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -420,7 +420,7 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
         DataStreamerResponse res = new DataStreamerResponse(reqId, errBytes, forceLocDep);
 
         try {
-            ctx.io().send(nodeId, resTopic, res, threadIoPolicy());
+            ctx.io().sendToCustomTopic(nodeId, resTopic, res, threadIoPolicy());
         }
         catch (IgniteCheckedException e) {
             if (ctx.discovery().alive(nodeId))

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index f97fc14..4c1de2c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -1693,7 +1693,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                     topVer);
 
                 try {
-                    ctx.io().send(node, TOPIC_DATASTREAM, req, plc);
+                    ctx.io().sendToGridTopic(node, TOPIC_DATASTREAM, req, plc);
 
                     if (log.isDebugEnabled())
                         log.debug("Sent request to node [nodeId=" + node.id() + ", req=" + req + ']');

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
index 4c037b7..0b2558a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
@@ -27,6 +27,7 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.FileSystemConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -169,7 +170,10 @@ public class IgfsContext {
         if (!kernalContext().localNodeId().equals(nodeId))
             msg.prepareMarshal(kernalContext().config().getMarshaller());
 
-        kernalContext().io().send(nodeId, topic, msg, plc);
+        if (topic instanceof GridTopic)
+            kernalContext().io().sendToGridTopic(nodeId, (GridTopic)topic, msg, plc);
+        else
+            kernalContext().io().sendToCustomTopic(nodeId, topic, msg, plc);
     }
 
     /**
@@ -184,7 +188,7 @@ public class IgfsContext {
         if (!kernalContext().localNodeId().equals(node.id()))
             msg.prepareMarshal(kernalContext().config().getMarshaller());
 
-        kernalContext().io().send(node, topic, msg, plc);
+        kernalContext().io().sendToCustomTopic(node, topic, msg, plc);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index 2b6699d..9ed6ff3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -518,7 +518,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
             ctx.io().addMessageListener(topic, msgLsnr);
 
             // 3. Send message.
-            ctx.io().send(taskNode, TOPIC_JOB_SIBLINGS,
+            ctx.io().sendToGridTopic(taskNode, TOPIC_JOB_SIBLINGS,
                 new GridJobSiblingsRequest(ses.getId(),
                     loc ? topic : null,
                     loc ? null : U.marshal(marsh, topic)),
@@ -1379,7 +1379,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
                 ctx.task().processJobExecuteResponse(ctx.localNodeId(), jobRes);
             else
                 // Send response to common topic as unordered message.
-                ctx.io().send(sndNode, TOPIC_TASK, jobRes, req.isInternal() ? MANAGEMENT_POOL : SYSTEM_POOL);
+                ctx.io().sendToGridTopic(sndNode, TOPIC_TASK, jobRes, req.isInternal() ? MANAGEMENT_POOL : SYSTEM_POOL);
         }
         catch (IgniteCheckedException e) {
             // The only option here is to log, as we must assume that resending will fail too.

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index acefde7..9b7615f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -912,7 +912,7 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
                                 ctx.task().processJobExecuteResponse(ctx.localNodeId(), jobRes);
                             else
                                 // Send response to common topic as unordered message.
-                                ctx.io().send(sndNode, TOPIC_TASK, jobRes, internal ? MANAGEMENT_POOL : SYSTEM_POOL);
+                                ctx.io().sendToGridTopic(sndNode, TOPIC_TASK, jobRes, internal ? MANAGEMENT_POOL : SYSTEM_POOL);
                         }
                         catch (IgniteCheckedException e) {
                             // Log and invoke the master-leave callback.

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java
index 773dabe..0be4e09 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java
@@ -96,7 +96,7 @@ final class ClientRequestFuture extends GridFutureAdapter<MappingExchangeResult>
                 ClusterNode srvNode = aliveSrvNodes.poll();
 
                 try {
-                    ioMgr.send(
+                    ioMgr.sendToGridTopic(
                             srvNode,
                             GridTopic.TOPIC_MAPPING_MARSH,
                             new MissingMappingRequestMessage(

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
index fdea869..66c19a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
@@ -178,7 +178,7 @@ public class GridMarshallerMappingProcessor extends GridProcessorAdapter {
             String resolvedClsName = marshallerCtx.resolveMissedMapping(platformId, typeId);
 
             try {
-                ioMgr.send(
+                ioMgr.sendToGridTopic(
                         nodeId,
                         TOPIC_MAPPING_MARSH,
                         new MissingMappingResponseMessage(platformId, typeId, resolvedClsName),

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
index 947435c..99ba335 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
@@ -143,7 +143,7 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter {
 
                     Object topic = U.unmarshal(ctx, req.topicBytes(), U.resolveClassLoader(ctx.config()));
 
-                    ctx.io().send(nodeId, topic, res, SYSTEM_POOL);
+                    ctx.io().sendToCustomTopic(nodeId, topic, res, SYSTEM_POOL);
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to send job task result response.", e);
@@ -494,7 +494,7 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter {
             try {
                 byte[] topicBytes = U.marshal(ctx, topic);
 
-                ctx.io().send(taskNode, TOPIC_REST, new GridTaskResultRequest(taskId, topic, topicBytes), SYSTEM_POOL);
+                ctx.io().sendToGridTopic(taskNode, TOPIC_REST, new GridTaskResultRequest(taskId, topic, topicBytes), SYSTEM_POOL);
             }
             catch (IgniteCheckedException e) {
                 String errMsg = "Failed to send task result request [resHolderId=" + resHolderId +

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index d32b51c..ec5d4c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -1311,7 +1311,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
 
                     boolean loc = ctx.localNodeId().equals(nodeId);
 
-                    ctx.io().send(nodeId, topic,
+                    ctx.io().sendToCustomTopic(nodeId, topic,
                         new GridJobSiblingsResponse(
                             loc ? siblings : null,
                             loc ? null : U.marshal(marsh, siblings)),

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index d18ea5f..02ef0fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -1281,7 +1281,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
                     ClusterNode node = ctx.discovery().node(nodeId);
 
                     if (node != null)
-                        ctx.io().send(node,
+                        ctx.io().sendToGridTopic(node,
                             TOPIC_JOB_CANCEL,
                             new GridJobCancelRequest(ses.getId(), res.getJobContext().getJobId(), /*courtesy*/true),
                             PUBLIC_POOL);
@@ -1382,7 +1382,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
                         ctx.job().processJobExecuteRequest(ctx.discovery().localNode(), req);
                     else {
                         // Send job execution request.
-                        ctx.io().send(node, TOPIC_JOB, req, internal ? MANAGEMENT_POOL : PUBLIC_POOL);
+                        ctx.io().sendToGridTopic(node, TOPIC_JOB, req, internal ? MANAGEMENT_POOL : PUBLIC_POOL);
 
                         if (log.isDebugEnabled())
                             log.debug("Sent job request [req=" + req + ", node=" + node + ']');

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
index 8503b48..f58be87 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
@@ -129,7 +129,7 @@ public class GridCommunicationSendMessageSelfTest extends GridCommonAbstractTest
         long time = System.nanoTime();
 
         for (int i = 1; i <= SAMPLE_CNT; i++) {
-            mgr0.send(grid(1).localNode(), topic, new TestMessage(), GridIoPolicy.PUBLIC_POOL);
+            mgr0.sendToCustomTopic(grid(1).localNode(), topic, new TestMessage(), GridIoPolicy.PUBLIC_POOL);
 
             if (i % 500 == 0)
                 info("Sent messages count: " + i);

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
index f5499d3..f4257a0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
@@ -90,21 +90,7 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest {
     public void testSendIfOneOfNodesIsLocalAndTopicIsEnum() throws Exception {
         GridTestUtils.assertThrows(log, new Callable<Object>() {
             @Override public Object call() throws Exception {
-                new GridIoManager(ctx).send(F.asList(locNode, rmtNode), GridTopic.TOPIC_CACHE, new TestMessage(),
-                    GridIoPolicy.P2P_POOL);
-
-                return null;
-            }
-        }, AssertionError.class, "Internal Ignite code should never call the method with local node in a node list.");
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testSendIfOneOfNodesIsLocalAndTopicIsObject() throws Exception {
-        GridTestUtils.assertThrows(log, new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                new GridIoManager(ctx).send(F.asList(locNode, rmtNode), new Object(), new TestMessage(),
+                new GridIoManager(ctx).sendToGridTopic(F.asList(locNode, rmtNode), GridTopic.TOPIC_CACHE, new TestMessage(),
                     GridIoPolicy.P2P_POOL);
 
                 return null;
@@ -127,12 +113,12 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest {
             // No-op. We are using mocks so real sending is impossible.
         }
 
-        verify(ioMgr).send(eq(locNode), eq(GridTopic.TOPIC_COMM_USER), any(GridIoUserMessage.class),
+        verify(ioMgr).sendToGridTopic(eq(locNode), eq(GridTopic.TOPIC_COMM_USER), any(GridIoUserMessage.class),
             eq(GridIoPolicy.PUBLIC_POOL));
 
         Collection<? extends ClusterNode> rmtNodes = F.view(F.asList(rmtNode), F.remoteNodes(locNode.id()));
 
-        verify(ioMgr).send(argThat(new IsEqualCollection(rmtNodes)), eq(GridTopic.TOPIC_COMM_USER),
+        verify(ioMgr).sendToGridTopic(argThat(new IsEqualCollection(rmtNodes)), eq(GridTopic.TOPIC_COMM_USER),
             any(GridIoUserMessage.class), eq(GridIoPolicy.PUBLIC_POOL));
     }
 
@@ -151,12 +137,12 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest {
             // No-op. We are using mocks so real sending is impossible.
         }
 
-        verify(ioMgr).send(eq(locNode), eq(GridTopic.TOPIC_COMM_USER), any(GridIoUserMessage.class),
+        verify(ioMgr).sendToGridTopic(eq(locNode), eq(GridTopic.TOPIC_COMM_USER), any(GridIoUserMessage.class),
             eq(GridIoPolicy.PUBLIC_POOL));
 
         Collection<? extends ClusterNode> rmtNodes = F.view(F.asList(rmtNode), F.remoteNodes(locNode.id()));
 
-        verify(ioMgr).send(argThat(new IsEqualCollection(rmtNodes)), eq(GridTopic.TOPIC_COMM_USER),
+        verify(ioMgr).sendToGridTopic(argThat(new IsEqualCollection(rmtNodes)), eq(GridTopic.TOPIC_COMM_USER),
             any(GridIoUserMessage.class), eq(GridIoPolicy.PUBLIC_POOL));
     }
 
@@ -175,7 +161,7 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest {
             // No-op. We are using mocks so real sending is impossible.
         }
 
-        verify(ioMgr).sendOrderedMessage(
+        verify(ioMgr).sendOrderedMessageToGridTopic(
             argThat(new IsEqualCollection(F.asList(locNode, rmtNode))),
             eq(GridTopic.TOPIC_COMM_USER),
             any(GridIoUserMessage.class),
@@ -196,7 +182,7 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Override public void send(ClusterNode node, GridTopic topic, Message msg, byte plc, boolean async)
+        @Override public void sendToGridTopic(ClusterNode node, GridTopic topic, Message msg, byte plc)
             throws IgniteCheckedException {
             // No-op.
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/test/java/org/apache/ignite/internal/util/nio/IgniteExceptionInNioWorkerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/IgniteExceptionInNioWorkerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/IgniteExceptionInNioWorkerSelfTest.java
index 9961833..8ac6e6a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/IgniteExceptionInNioWorkerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/IgniteExceptionInNioWorkerSelfTest.java
@@ -74,7 +74,7 @@ public class IgniteExceptionInNioWorkerSelfTest extends GridCommonAbstractTest {
             UUID nodeId = ignite(1).cluster().localNode().id();
 
             // This should trigger a failure in a NIO thread.
-            kernal.context().io().send(nodeId, GridTopic.TOPIC_CACHE.topic("cache"), new BrokenMessage(), (byte)0);
+            kernal.context().io().sendToCustomTopic(nodeId, GridTopic.TOPIC_CACHE.topic("cache"), new BrokenMessage(), (byte)0);
 
             for (int i = 0; i < 100; i++)
                 ignite(0).cache("cache").put(i, i);

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java
index 723495c..03bbb00 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java
@@ -249,7 +249,7 @@ public class GridIoManagerBenchmark {
                 testMsg.bytes(null);
 
                 try {
-                    io.send(node, TEST_TOPIC, testMsg, PUBLIC_POOL);
+                    io.sendToCustomTopic(node, TEST_TOPIC, testMsg, PUBLIC_POOL);
                 }
                 catch (IgniteCheckedException e) {
                     e.printStackTrace();
@@ -293,7 +293,7 @@ public class GridIoManagerBenchmark {
                     else
                         sem.acquire();
 
-                    io.send(
+                    io.sendToCustomTopic(
                         dst,
                         TEST_TOPIC,
                         new GridTestMessage(msgId, testHeavyMsgs ? arrs[rnd.nextInt(arrs.length)] : null),

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
index f2c6255..92b29e4 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
@@ -132,7 +132,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
             new GridMessageListener() {
                 @Override public void onMessage(UUID nodeId, Object msg) {
                     try {
-                        rcv.send(sndNode, topic, (Message)msg, PUBLIC_POOL);
+                        rcv.sendToCustomTopic(sndNode, topic, (Message)msg, PUBLIC_POOL);
                     }
                     catch (IgniteCheckedException e) {
                         error("Failed to send message.", e);
@@ -176,7 +176,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
                     while (!finish.get()) {
                         sem.acquire();
 
-                        snd.send(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL);
+                        snd.sendToCustomTopic(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL);
                     }
                 }
                 catch (IgniteCheckedException e) {
@@ -226,7 +226,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
             new GridMessageListener() {
                 @Override public void onMessage(UUID nodeId, Object msg) {
                     try {
-                        rcv.send(sndNode, topic, (Message)msg, PUBLIC_POOL);
+                        rcv.sendToCustomTopic(sndNode, topic, (Message)msg, PUBLIC_POOL);
                     }
                     catch (IgniteCheckedException e) {
                         error("Failed to send message.", e);
@@ -270,7 +270,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
 
                         map.put(msgId, latch);
 
-                        snd.send(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL);
+                        snd.sendToCustomTopic(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL);
 
                         latch.await();
 
@@ -326,7 +326,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
             new GridMessageListener() {
                 @Override public void onMessage(UUID nodeId, Object msg) {
                     try {
-                        rcv.send(sndNode, topic, (Message)msg, PUBLIC_POOL);
+                        rcv.sendToCustomTopic(sndNode, topic, (Message)msg, PUBLIC_POOL);
                     }
                     catch (IgniteCheckedException e) {
                         error("Failed to send message.", e);
@@ -362,7 +362,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
 
                     sem.acquire();
 
-                    snd.send(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL);
+                    snd.sendToCustomTopic(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL);
                 }
 
                 return null;
@@ -432,7 +432,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
 
                     latches.put(msgId, latch);
 
-                    snd.send(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL);
+                    snd.sendToCustomTopic(rcvNode, topic, new GridTestMessage(msgId, (String)null), PUBLIC_POOL);
 
                     long start = System.currentTimeMillis();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
index 9c97542..c0ea662 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
@@ -194,7 +194,7 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
             msg.add(mes2);
         }
 
-        mgr0.send(grid(1).localNode(), topic, msg, GridIoPolicy.PUBLIC_POOL);
+        mgr0.sendToCustomTopic(grid(1).localNode(), topic, msg, GridIoPolicy.PUBLIC_POOL);
 
         assert latch.await(3, SECONDS);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
index e67a26a..7575ff4 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
@@ -145,7 +145,7 @@ public class HadoopShuffle extends HadoopComponent {
         ClusterNode node = ctx.kernalContext().discovery().node(nodeId);
 
         if (msg instanceof Message)
-            ctx.kernalContext().io().send(node, GridTopic.TOPIC_HADOOP_MSG, (Message)msg, GridIoPolicy.PUBLIC_POOL);
+            ctx.kernalContext().io().sendToGridTopic(node, GridTopic.TOPIC_HADOOP_MSG, (Message)msg, GridIoPolicy.PUBLIC_POOL);
         else
             ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0, false);
     }
@@ -153,6 +153,7 @@ public class HadoopShuffle extends HadoopComponent {
     /**
      * @param jobId Task info.
      * @return Shuffle job.
+     * @throws IgniteCheckedException If failed.
      */
     private HadoopShuffleJob<UUID> job(HadoopJobId jobId) throws IgniteCheckedException {
         HadoopShuffleJob<UUID> res = jobs.get(jobId);

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index e4b0c1f..88cd89b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1890,7 +1890,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                         ((GridCacheQueryMarshallable)msg).marshall(marshaller);
                 }
 
-                ctx.io().send(node, topic, topicOrd, msg, plc);
+                ctx.io().sendGeneric(node, topic, topicOrd, msg, plc);
             }
             catch (IgniteCheckedException e) {
                 ok = false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 2802da5..33a6778 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -655,7 +655,7 @@ public class GridMapQueryExecutor {
                 h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);
             }
             else
-                ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.QUERY_POOL);
+                ctx.io().sendToGridTopic(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.QUERY_POOL);
         }
         catch (Exception e) {
             e.addSuppressed(err);
@@ -729,7 +729,7 @@ public class GridMapQueryExecutor {
             if (loc)
                 h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);
             else
-                ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.QUERY_POOL);
+                ctx.io().sendToGridTopic(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.QUERY_POOL);
         }
         catch (IgniteCheckedException e) {
             log.error("Failed to send message.", e);
@@ -756,7 +756,7 @@ public class GridMapQueryExecutor {
             if (loc)
                 h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);
             else
-                ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.QUERY_POOL);
+                ctx.io().sendToGridTopic(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.QUERY_POOL);
         }
         catch (Exception e) {
             U.warn(log, "Failed to send retry message: " + e.getMessage());

http://git-wip-us.apache.org/repos/asf/ignite/blob/93e19962/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 61ca11d..604e522 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -328,7 +328,7 @@ public class GridReduceQueryExecutor {
                         if (node.isLocal())
                             h2.mapQueryExecutor().onMessage(ctx.localNodeId(), msg0);
                         else
-                            ctx.io().send(node, GridTopic.TOPIC_QUERY, msg0, GridIoPolicy.QUERY_POOL);
+                            ctx.io().sendToGridTopic(node, GridTopic.TOPIC_QUERY, msg0, GridIoPolicy.QUERY_POOL);
                     }
                     catch (IgniteCheckedException e) {
                         throw new CacheException("Failed to fetch data from node: " + node.id(), e);


Mime
View raw message