ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: Pass io policy in GridMessageListener.
Date Mon, 03 Jul 2017 12:45:59 GMT
Repository: ignite
Updated Branches:
  refs/heads/master 7db925c1c -> bdd31af76


Pass io policy in GridMessageListener.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bdd31af7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bdd31af7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bdd31af7

Branch: refs/heads/master
Commit: bdd31af760f745c9ab05f75d100649a30dfe4f1e
Parents: 7db925c
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Jul 3 15:45:04 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Jul 3 15:45:04 2017 +0300

----------------------------------------------------------------------
 .../checkpoint/GridCheckpointManager.java       |  2 +-
 .../managers/communication/GridIoManager.java   | 10 +--
 .../communication/GridMessageListener.java      |  3 +-
 .../deployment/GridDeploymentCommunication.java |  4 +-
 .../eventstorage/GridEventStorageManager.java   |  4 +-
 .../processors/cache/GridCacheIoManager.java    | 77 +++++++++++---------
 .../cache/binary/BinaryMetadataTransport.java   |  4 +-
 .../cache/transactions/IgniteTxManager.java     |  2 +-
 .../processors/cluster/ClusterProcessor.java    |  2 +-
 .../continuous/GridContinuousProcessor.java     |  4 +-
 .../datastreamer/DataStreamProcessor.java       |  2 +-
 .../datastreamer/DataStreamerImpl.java          |  2 +-
 .../processors/igfs/IgfsDataManager.java        |  2 +-
 .../igfs/IgfsFragmentizerManager.java           |  4 +-
 .../processors/job/GridJobProcessor.java        |  8 +-
 .../GridMarshallerMappingProcessor.java         |  4 +-
 .../processors/query/GridQueryProcessor.java    |  2 +-
 .../handlers/task/GridTaskCommandHandler.java   |  4 +-
 .../processors/task/GridTaskProcessor.java      |  6 +-
 .../jobstealing/JobStealingCollisionSpi.java    |  2 +-
 ...idCommunicationManagerListenersSelfTest.java |  2 +-
 .../GridCommunicationSendMessageSelfTest.java   |  2 +-
 .../cache/GridCachePartitionedGetSelfTest.java  |  2 +-
 ...lerCacheClientRequestsMappingOnMissTest.java |  6 +-
 ...naryObjectMetadataExchangeMultinodeTest.java |  6 +-
 ...DeadlockDetectionMessageMarshallingTest.java |  2 +-
 .../communication/GridIoManagerBenchmark.java   |  4 +-
 .../communication/GridIoManagerBenchmark0.java  | 12 +--
 .../communication/GridCacheMessageSelfTest.java |  2 +-
 .../testframework/GridSpiTestContext.java       |  5 +-
 .../hadoop/shuffle/HadoopShuffle.java           |  2 +-
 .../query/h2/opt/GridH2IndexBase.java           |  2 +-
 .../query/h2/twostep/GridMapQueryExecutor.java  |  2 +-
 .../h2/twostep/GridReduceQueryExecutor.java     |  2 +-
 34 files changed, 106 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
index 782ee5e..aae1a3b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
@@ -464,7 +464,7 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
          * @param msg Received message.
          */
         @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             GridCheckpointRequest req = (GridCheckpointRequest)msg;
 
             if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index a1ddaf4..3ff44a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -337,7 +337,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             log.debug(startInfo());
 
         addMessageListener(GridTopic.TOPIC_IO_TEST, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 ClusterNode node = ctx.discovery().node(nodeId);
 
                 if (node == null)
@@ -1553,7 +1553,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             CUR_PLC.set(plc);
 
         try {
-            lsnr.onMessage(nodeId, msg);
+            lsnr.onMessage(nodeId, msg, plc);
         }
         finally {
             if (change)
@@ -2323,14 +2323,14 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
          * @param nodeId Node ID.
          * @param msg Message.
          */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             GridMessageListener[] arr0 = arr;
 
             if (arr0 == null)
                 return;
 
             for (GridMessageListener l : arr0)
-                l.onMessage(nodeId, msg);
+                l.onMessage(nodeId, msg, plc);
         }
 
         /**
@@ -2430,7 +2430,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         /** {@inheritDoc} */
         @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter", "ConstantConditions",
             "OverlyStrongTypeCast"})
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             if (!(msg instanceof GridIoUserMessage)) {
                 U.error(log, "Received unknown message (potentially fatal problem): " + msg);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridMessageListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridMessageListener.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridMessageListener.java
index 3993591..c7de57c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridMessageListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridMessageListener.java
@@ -30,6 +30,7 @@ public interface GridMessageListener extends EventListener {
      * @param nodeId ID of node that sent the message. Note that may have already
      *      left topology by the time this message is received.
      * @param msg Message received.
+     * @param plc Message policy (pool).
      */
-    public void onMessage(UUID nodeId, Object msg);
+    public void onMessage(UUID nodeId, Object msg, byte plc);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
index 23d186a..2a5f7ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
@@ -83,7 +83,7 @@ class GridDeploymentCommunication {
         this.log = log.getLogger(getClass());
 
         peerLsnr = new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 processDeploymentRequest(nodeId, msg);
             }
         };
@@ -422,7 +422,7 @@ class GridDeploymentCommunication {
         };
 
         GridMessageListener resLsnr = new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 assert nodeId != null;
                 assert msg != null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
index bd43e43..7836004 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
@@ -1008,7 +1008,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
 
         GridMessageListener resLsnr = new GridMessageListener() {
             @SuppressWarnings("deprecation")
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 assert nodeId != null;
                 assert msg != null;
 
@@ -1185,7 +1185,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
      */
     private class RequestListener implements GridMessageListener {
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             assert nodeId != null;
             assert msg != null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index a920bd0..49cfcdd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -155,7 +155,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
     /** Message listener. */
     private GridMessageListener lsnr = new GridMessageListener() {
-        @Override public void onMessage(final UUID nodeId, final Object msg) {
+        @Override public void onMessage(final UUID nodeId, final Object msg, final byte plc) {
             if (log.isDebugEnabled())
                 log.debug("Received unordered cache communication message [nodeId=" + nodeId +
                     ", locId=" + cctx.localNodeId() + ", msg=" + msg + ']');
@@ -196,7 +196,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                             @Override public void apply(IgniteInternalFuture<?> fut) {
                                 cctx.kernalContext().closure().runLocalSafe(new Runnable() {
                                     @Override public void run() {
-                                        handleMessage(nodeId, cacheMsg);
+                                        handleMessage(nodeId, cacheMsg, plc);
                                     }
                                 });
                             }
@@ -269,40 +269,48 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                                     log.debug(msg0.toString());
                                 }
 
-                                handleMessage(nodeId, cacheMsg);
+                                handleMessage(nodeId, cacheMsg, plc);
                             }
                         };
 
                         if (stripe >= 0)
                             cctx.kernalContext().getStripedExecutorService().execute(stripe, c);
-                        else
-                            cctx.kernalContext().closure().runLocalSafe(c);
+                        else {
+                            try {
+                                cctx.kernalContext().pools().poolForPolicy(plc).execute(c);
+                            }
+                            catch (IgniteCheckedException e) {
+                                U.error(cacheMsg.messageLogger(cctx), "Failed to get pool for policy: " + plc, e);
+                            }
+                        }
                     }
                 });
 
                 return;
             }
 
-            handleMessage(nodeId, cacheMsg);
+            handleMessage(nodeId, cacheMsg, plc);
         }
     };
 
     /**
      * @param nodeId Sender node ID.
      * @param cacheMsg Message.
+     * @param plc Message policy.
      */
     @SuppressWarnings("unchecked")
-    private void handleMessage(UUID nodeId, GridCacheMessage cacheMsg) {
-        handleMessage(nodeId, cacheMsg, cacheMsg.cacheGroupMessage() ? grpHandlers : cacheHandlers);
+    private void handleMessage(UUID nodeId, GridCacheMessage cacheMsg, byte plc) {
+        handleMessage(nodeId, cacheMsg, cacheMsg.cacheGroupMessage() ? grpHandlers : cacheHandlers, plc);
     }
 
     /**
      * @param nodeId Sender node ID.
      * @param cacheMsg Message.
      * @param msgHandlers Message handlers.
+     * @param plc Message policy.
      */
     @SuppressWarnings("unchecked")
-    private void handleMessage(UUID nodeId, GridCacheMessage cacheMsg, MessageHandlers msgHandlers) {
+    private void handleMessage(UUID nodeId, GridCacheMessage cacheMsg, MessageHandlers msgHandlers, byte plc) {
         Lock lock = rw.readLock();
 
         lock.lock();
@@ -356,7 +364,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                 return;
             }
 
-            onMessage0(nodeId, cacheMsg, c);
+            onMessage0(nodeId, cacheMsg, c, plc);
         }
         finally {
             lock.unlock();
@@ -517,10 +525,11 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
      * @param nodeId Node ID.
      * @param cacheMsg Cache message.
      * @param c Handler closure.
+     * @param plc Message policy.
      */
     @SuppressWarnings({"unchecked", "ConstantConditions", "ThrowableResultOfMethodCallIgnored"})
     private void onMessage0(final UUID nodeId, final GridCacheMessage cacheMsg,
-        final IgniteBiInClosure<UUID, GridCacheMessage> c) {
+        final IgniteBiInClosure<UUID, GridCacheMessage> c, byte plc) {
         try {
             if (stopping) {
                 if (log.isDebugEnabled())
@@ -536,7 +545,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
             unmarshall(nodeId, cacheMsg);
 
             if (cacheMsg.classError() != null)
-                processFailedMessage(nodeId, cacheMsg, c);
+                processFailedMessage(nodeId, cacheMsg, c, plc);
             else
                 processMessage(nodeId, cacheMsg, c);
         }
@@ -669,15 +678,17 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
      *
      * @param nodeId Node ID.
      * @param msg Message.
+     * @param c Closure.
+     * @param plc Message policy.
      * @throws IgniteCheckedException If failed.
      */
-    private void processFailedMessage(UUID nodeId, GridCacheMessage msg, IgniteBiInClosure<UUID, GridCacheMessage> c)
+    private void processFailedMessage(UUID nodeId,
+        GridCacheMessage msg,
+        IgniteBiInClosure<UUID, GridCacheMessage> c,
+        byte plc)
         throws IgniteCheckedException {
         assert msg != null;
 
-        GridCacheContext ctx = msg instanceof GridCacheIdMessage ?
-            cctx.cacheContext(((GridCacheIdMessage)msg).cacheId()) : null;
-
         switch (msg.directType()) {
             case 30: {
                 GridDhtLockRequest req = (GridDhtLockRequest)msg;
@@ -688,9 +699,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                     req.futureId(),
                     req.miniId(),
                     0,
-                    ctx.deploymentEnabled());
+                    false);
 
-                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+                sendResponseOnFailedMessage(nodeId, res, cctx, plc);
             }
 
             break;
@@ -723,7 +734,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
                 res.onError(req.classError());
 
-                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+                sendResponseOnFailedMessage(nodeId, res, cctx, plc);
 
                 if (req.nearNodeId() != null) {
                     GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(req.cacheId(),
@@ -734,7 +745,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
                     nearRes.errors(new UpdateErrors(req.classError()));
 
-                    sendResponseOnFailedMessage(req.nearNodeId(), nearRes, cctx, ctx.ioPolicy());
+                    sendResponseOnFailedMessage(req.nearNodeId(), nearRes, cctx, plc);
                 }
             }
 
@@ -753,7 +764,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
                 res.error(req.classError());
 
-                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+                sendResponseOnFailedMessage(nodeId, res, cctx, plc);
             }
 
             break;
@@ -770,7 +781,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
                 res.error(req.classError());
 
-                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+                sendResponseOnFailedMessage(nodeId, res, cctx, plc);
             }
 
             break;
@@ -793,7 +804,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
                 res.error(req.classError());
 
-                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+                sendResponseOnFailedMessage(nodeId, res, cctx, plc);
             }
 
             break;
@@ -831,7 +842,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                     null,
                     false);
 
-                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+                sendResponseOnFailedMessage(nodeId, res, cctx, plc);
             }
 
             break;
@@ -872,7 +883,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                     cctx.node(nodeId),
                     TOPIC_CACHE.topic(QUERY_TOPIC_PREFIX, nodeId, req.id()),
                     res,
-                    ctx.ioPolicy(),
+                    plc,
                     Long.MAX_VALUE);
             }
 
@@ -897,7 +908,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
                 res.error(req.classError());
 
-                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+                sendResponseOnFailedMessage(nodeId, res, cctx, plc);
             }
 
             break;
@@ -935,7 +946,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
                 res.error(req.classError());
 
-                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+                sendResponseOnFailedMessage(nodeId, res, cctx, plc);
             }
 
             break;
@@ -953,7 +964,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
                 res.error(req.classError());
 
-                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+                sendResponseOnFailedMessage(nodeId, res, cctx, plc);
             }
 
             break;
@@ -971,7 +982,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
                 res.error(req.classError());
 
-                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+                sendResponseOnFailedMessage(nodeId, res, cctx, plc);
             }
 
             break;
@@ -987,7 +998,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
                 res.onError(req.classError());
 
-                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+                sendResponseOnFailedMessage(nodeId, res, cctx, plc);
 
                 if (req.nearNodeId() != null) {
                     GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(req.cacheId(),
@@ -998,7 +1009,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
                     nearRes.errors(new UpdateErrors(req.classError()));
 
-                    sendResponseOnFailedMessage(req.nearNodeId(), nearRes, cctx, ctx.ioPolicy());
+                    sendResponseOnFailedMessage(req.nearNodeId(), nearRes, cctx, plc);
                 }
             }
 
@@ -1540,7 +1551,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
         /** {@inheritDoc} */
         @SuppressWarnings({"CatchGenericClass", "unchecked"})
-        @Override public void onMessage(final UUID nodeId, Object msg) {
+        @Override public void onMessage(final UUID nodeId, Object msg, byte plc) {
             if (log.isDebugEnabled())
                 log.debug("Received cache ordered message [nodeId=" + nodeId + ", msg=" + msg + ']');
 
@@ -1551,7 +1562,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
             try {
                 GridCacheMessage cacheMsg = (GridCacheMessage)msg;
 
-                onMessage0(nodeId, cacheMsg, c);
+                onMessage0(nodeId, cacheMsg, c, plc);
             }
             finally {
                 lock.unlock();

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
index 00c760f..5fd7295 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
@@ -561,7 +561,7 @@ final class BinaryMetadataTransport {
         }
 
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             assert msg instanceof MetadataRequestMessage : msg;
 
             MetadataRequestMessage msg0 = (MetadataRequestMessage) msg;
@@ -606,7 +606,7 @@ final class BinaryMetadataTransport {
     private final class MetadataResponseListener implements GridMessageListener {
 
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             assert msg instanceof MetadataResponseMessage : msg;
 
             MetadataResponseMessage msg0 = (MetadataResponseMessage) msg;

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index a9aa13d..3a3b766 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -2433,7 +2433,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     private class DeadlockDetectionListener implements GridMessageListener {
         /** {@inheritDoc} */
         @SuppressWarnings("unchecked")
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             GridCacheMessage cacheMsg = (GridCacheMessage)msg;
 
             Throwable err = null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
index ed83650..0bd2370 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
@@ -146,7 +146,7 @@ public class ClusterProcessor extends GridProcessorAdapter {
             EVT_NODE_FAILED, EVT_NODE_LEFT);
 
         ctx.io().addMessageListener(TOPIC_INTERNAL_DIAGNOSTIC, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 if (msg instanceof IgniteDiagnosticMessage) {
                     IgniteDiagnosticMessage msg0 = (IgniteDiagnosticMessage)msg;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index f641399..b1d3442 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -289,7 +289,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             });
 
         ctx.io().addMessageListener(TOPIC_CONTINUOUS, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object obj) {
+            @Override public void onMessage(UUID nodeId, Object obj, byte plc) {
                 GridContinuousMessage msg = (GridContinuousMessage)obj;
 
                 if (msg.data() == null && msg.dataBytes() != null) {
@@ -771,7 +771,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     private void registerMessageListener(GridContinuousHandler hnd) {
         if (hnd.orderedTopic() != null) {
             ctx.io().addMessageListener(hnd.orderedTopic(), new GridMessageListener() {
-                @Override public void onMessage(UUID nodeId, Object obj) {
+                @Override public void onMessage(UUID nodeId, Object obj, byte plc) {
                     GridContinuousMessage msg = (GridContinuousMessage)obj;
 
                     // Only notification can be ordered.

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index c52f7ac..31ae1e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -82,7 +82,7 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
 
         if (!ctx.clientNode()) {
             ctx.io().addMessageListener(TOPIC_DATASTREAM, new GridMessageListener() {
-                @Override public void onMessage(UUID nodeId, Object msg) {
+                @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                     assert msg instanceof DataStreamerRequest;
 
                     processRequest(nodeId, (DataStreamerRequest)msg);

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 40988d3..ae441de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -319,7 +319,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
         topic = TOPIC_DATASTREAM.topic(IgniteUuid.fromUuid(ctx.localNodeId()));
 
         ctx.io().addMessageListener(topic, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 assert msg instanceof DataStreamerResponse;
 
                 DataStreamerResponse res = (DataStreamerResponse)msg;

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index 8ae6db8..a4ea337 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -158,7 +158,7 @@ public class IgfsDataManager extends IgfsManager {
         topic = F.isEmpty(igfsName) ? TOPIC_IGFS : TOPIC_IGFS.topic(igfsName);
 
         igfsCtx.kernalContext().io().addMessageListener(topic, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 if (msg instanceof IgfsBlocksMessage)
                     processBlocksMessage(nodeId, (IgfsBlocksMessage)msg);
                 else if (msg instanceof IgfsAckMessage)

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
index 7797f89..0c75bc3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
@@ -453,7 +453,7 @@ public class IgfsFragmentizerManager extends IgfsManager {
         }
 
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             if (msg instanceof IgfsFragmentizerResponse) {
                 IgfsFragmentizerResponse res = (IgfsFragmentizerResponse)msg;
 
@@ -673,7 +673,7 @@ public class IgfsFragmentizerManager extends IgfsManager {
         }
 
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             if (msg instanceof IgfsFragmentizerRequest ||
                 msg instanceof IgfsSyncMessage) {
                 if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index 408396a..5ae4da6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -447,7 +447,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
         final Condition cond = lock.newCondition();
 
         GridMessageListener msgLsnr = new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 String err = null;
                 GridJobSiblingsResponse res = null;
 
@@ -1856,7 +1856,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
      */
     private class JobSessionListener implements GridMessageListener {
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             assert nodeId != null;
             assert msg != null;
 
@@ -1872,7 +1872,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
      */
     private class JobCancelListener implements GridMessageListener {
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             assert nodeId != null;
             assert msg != null;
 
@@ -1890,7 +1890,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
      */
     private class JobExecutionListener implements GridMessageListener {
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             assert nodeId != null;
             assert msg != null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
index 8de6c49..2543042 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
@@ -168,7 +168,7 @@ public class GridMarshallerMappingProcessor extends GridProcessorAdapter {
         }
 
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             assert msg instanceof MissingMappingRequestMessage : msg;
 
             MissingMappingRequestMessage msg0 = (MissingMappingRequestMessage) msg;
@@ -200,7 +200,7 @@ public class GridMarshallerMappingProcessor extends GridProcessorAdapter {
      */
     private final class MissingMappingResponseListener implements GridMessageListener {
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             assert msg instanceof MissingMappingResponseMessage : msg;
 
             MissingMappingResponseMessage msg0 = (MissingMappingResponseMessage) msg;

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index dd07584..d55a129 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -207,7 +207,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
         valCtx = new CacheQueryObjectValueContext(ctx);
 
         ioLsnr = new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 if (msg instanceof SchemaOperationStatusMessage) {
                     SchemaOperationStatusMessage msg0 = (SchemaOperationStatusMessage)msg;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
index 99ba335..d9b49cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
@@ -109,7 +109,7 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter {
         super(ctx);
 
         ctx.io().addMessageListener(TOPIC_REST, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 if (!(msg instanceof GridTaskResultRequest)) {
                     U.warn(log, "Received unexpected message instead of task result request: " + msg);
 
@@ -425,7 +425,7 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter {
         final Condition cond = lock.newCondition();
 
         GridMessageListener msgLsnr = new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 String err = null;
                 GridTaskResultResponse res = null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index 6ae97dd..d7a022e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -1282,7 +1282,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
         }
 
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             if (msg instanceof GridJobExecuteResponse)
                 processJobExecuteResponse(nodeId, (GridJobExecuteResponse)msg);
             else if (jobResOnly)
@@ -1326,7 +1326,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
      */
     private class JobSiblingsMessageListener implements GridMessageListener {
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             if (!(msg instanceof GridJobSiblingsRequest)) {
                 U.warn(log, "Received unexpected message instead of siblings request: " + msg);
 
@@ -1398,7 +1398,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
      */
     private class TaskCancelMessageListener implements GridMessageListener {
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             assert msg != null;
 
             if (!(msg instanceof GridTaskCancelRequest)) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java
index 6f2c099..39a8e8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java
@@ -648,7 +648,7 @@ public class JobStealingCollisionSpi extends IgniteSpiAdapter implements Collisi
 
         spiCtx.addMessageListener(
             msgLsnr = new GridMessageListener() {
-                @Override public void onMessage(UUID nodeId, Object msg) {
+                @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                     MessageInfo info = rcvMsgMap.get(nodeId);
 
                     if (info == null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationManagerListenersSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationManagerListenersSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationManagerListenersSelfTest.java
index 1738da8..03b7921 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationManagerListenersSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationManagerListenersSelfTest.java
@@ -159,7 +159,7 @@ public class GridCommunicationManagerListenersSelfTest extends GridCommonAbstrac
         }
 
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             // No-op.
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
index 29b7847..3563c77 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
@@ -143,7 +143,7 @@ public class GridCommunicationSendMessageSelfTest extends GridCommonAbstractTest
         final CountDownLatch latch = new CountDownLatch(SAMPLE_CNT);
 
         mgr1.addMessageListener(topic, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 if (msgCls.isInstance(msg))
                     latch.countDown();
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java
index 308f2b4..d5988f1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java
@@ -224,7 +224,7 @@ public class GridCachePartitionedGetSelfTest extends GridCommonAbstractTest {
                 ((IgniteKernal)g).context().io().addMessageListener(
                     TOPIC_CACHE,
                     new GridMessageListener() {
-                        @Override public void onMessage(UUID nodeId, Object msg) {
+                        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                             info("Received message from node [nodeId=" + nodeId + ", msg=" + msg + ']');
 
                             if (msg instanceof GridNearSingleGetRequest) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingOnMissTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingOnMissTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingOnMissTest.java
index f5f2512..057b970 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingOnMissTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingOnMissTest.java
@@ -300,10 +300,10 @@ public class IgniteMarshallerCacheClientRequestsMappingOnMissTest extends GridCo
         final GridMessageListener delegate = lsnrs[GridTopic.TOPIC_MAPPING_MARSH.ordinal()];
 
         GridMessageListener wrapper = new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 mappingReqsCounter.incrementAndGet();
 
-                delegate.onMessage(nodeId, msg);
+                delegate.onMessage(nodeId, msg, plc);
             }
         };
 
@@ -321,7 +321,7 @@ public class IgniteMarshallerCacheClientRequestsMappingOnMissTest extends GridCo
         ioMgr.removeMessageListener(GridTopic.TOPIC_MAPPING_MARSH);
 
         ioMgr.addMessageListener(GridTopic.TOPIC_MAPPING_MARSH, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 new Thread(new Runnable() {
                     @Override public void run() {
                         mappingReqsCounter.incrementAndGet();

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectMetadataExchangeMultinodeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectMetadataExchangeMultinodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectMetadataExchangeMultinodeTest.java
index 9370e27..6ff703e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectMetadataExchangeMultinodeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectMetadataExchangeMultinodeTest.java
@@ -396,7 +396,7 @@ public class GridCacheBinaryObjectMetadataExchangeMultinodeTest extends GridComm
         ioMgr.removeMessageListener(GridTopic.TOPIC_METADATA_REQ);
 
         ioMgr.addMessageListener(GridTopic.TOPIC_METADATA_REQ, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 new Thread(new Runnable() {
                     @Override public void run() {
                         metadataReqsCounter.incrementAndGet();
@@ -416,9 +416,9 @@ public class GridCacheBinaryObjectMetadataExchangeMultinodeTest extends GridComm
         final GridMessageListener delegate = lsnrs[GridTopic.TOPIC_METADATA_REQ.ordinal()];
 
         GridMessageListener wrapper = new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 metadataReqsCounter.incrementAndGet();
-                delegate.onMessage(nodeId, msg);
+                delegate.onMessage(nodeId, msg, plc);
             }
         };
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionMessageMarshallingTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionMessageMarshallingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionMessageMarshallingTest.java
index 9126053..1a48cec 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionMessageMarshallingTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionMessageMarshallingTest.java
@@ -76,7 +76,7 @@ public class TxDeadlockDetectionMessageMarshallingTest extends GridCommonAbstrac
             final AtomicBoolean res = new AtomicBoolean();
 
             clientCtx.gridIO().addMessageListener(TOPIC, new GridMessageListener() {
-                @Override public void onMessage(UUID nodeId, Object msg) {
+                @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                     if (msg instanceof TxLocksResponse) {
                         try {
                             ((TxLocksResponse)msg).finishUnmarshal(clientCtx, clientCtx.deploy().globalLoader());

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java
index 03bbb00..5671158 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java
@@ -240,7 +240,7 @@ public class GridIoManagerBenchmark {
         GridMessageListener lsnr = new GridMessageListener() {
             private ClusterNode node;
 
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 if (node == null)
                     node = g.context().discovery().node(nodeId);
 
@@ -336,7 +336,7 @@ public class GridIoManagerBenchmark {
      */
     private static class SenderMessageListener implements GridMessageListener {
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             msgCntr.increment();
 
             if (testLatency)

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
index 0f0332f..7b1d972 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
@@ -130,7 +130,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
         rcv.addMessageListener(
             topic,
             new GridMessageListener() {
-                @Override public void onMessage(UUID nodeId, Object msg) {
+                @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                     try {
                         rcv.sendToCustomTopic(sndNode, topic, (Message)msg, PUBLIC_POOL);
                     }
@@ -141,7 +141,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
             });
 
         snd.addMessageListener(topic, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 msgCntr.increment();
 
                 sem.release();
@@ -224,7 +224,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
         rcv.addMessageListener(
             topic,
             new GridMessageListener() {
-                @Override public void onMessage(UUID nodeId, Object msg) {
+                @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                     try {
                         rcv.sendToCustomTopic(sndNode, topic, (Message)msg, PUBLIC_POOL);
                     }
@@ -235,7 +235,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
             });
 
         snd.addMessageListener(topic, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 map.get(((GridTestMessage)msg).id()).countDown();
             }
         });
@@ -324,7 +324,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
         rcv.addMessageListener(
             topic,
             new GridMessageListener() {
-                @Override public void onMessage(UUID nodeId, Object msg) {
+                @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                     try {
                         rcv.sendToCustomTopic(sndNode, topic, (Message)msg, PUBLIC_POOL);
                     }
@@ -335,7 +335,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
             });
 
         snd.addMessageListener(topic, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 msgCntr.increment();
 
                 sem.release();

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
index 4a6b765..435af8f 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
@@ -139,7 +139,7 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
         final CountDownLatch latch = new CountDownLatch(SAMPLE_CNT);
 
         mgr1.addMessageListener(topic, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 try {
                     latch.countDown();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index 7d3c5d6..93cd911 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.direct.DirectMessageReader;
 import org.apache.ignite.internal.direct.DirectMessageWriter;
 import org.apache.ignite.internal.managers.communication.GridIoManager;
 import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridIoUserMessage;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
@@ -328,7 +329,7 @@ public class GridSpiTestContext implements IgniteSpiContext {
     @SuppressWarnings("deprecation")
     public void triggerMessage(ClusterNode node, Object msg) {
         for (GridMessageListener lsnr : msgLsnrs)
-            lsnr.onMessage(node.id(), msg);
+            lsnr.onMessage(node.id(), msg, GridIoPolicy.SYSTEM_POOL);
     }
 
     /** {@inheritDoc} */
@@ -667,7 +668,7 @@ public class GridSpiTestContext implements IgniteSpiContext {
         @SuppressWarnings({
             "SynchronizationOnLocalVariableOrMethodParameter", "ConstantConditions",
             "OverlyStrongTypeCast"})
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             GridIoUserMessage ioMsg = (GridIoUserMessage)msg;
 
             Object msgBody = ioMsg.body();

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
index 3296993..cd1c93c 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
@@ -63,7 +63,7 @@ public class HadoopShuffle extends HadoopComponent {
         super.start(ctx);
 
         ctx.kernalContext().io().addMessageListener(GridTopic.TOPIC_HADOOP_MSG, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 onMessageReceived(nodeId, (HadoopMessage)msg);
             }
         });

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index 3dabc58..542adf0 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -135,7 +135,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
             msgTopic = new IgniteBiTuple<>(GridTopic.TOPIC_QUERY, tbl.identifierString() + '.' + getName());
 
             msgLsnr = new GridMessageListener() {
-                @Override public void onMessage(UUID nodeId, Object msg) {
+                @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                     GridSpinBusyLock l = desc.indexing().busyLock();
 
                     if (!l.enterBusy())

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 6b7ba75..fcf5f10 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -172,7 +172,7 @@ public class GridMapQueryExecutor {
         }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
 
         ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 if (!busyLock.enterBusy())
                     return;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index b85fa61..85a7e0b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -169,7 +169,7 @@ public class GridReduceQueryExecutor {
         log = ctx.log(GridReduceQueryExecutor.class);
 
         ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 if (!busyLock.enterBusy())
                     return;
 


Mime
View raw message