ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: gg-12133
Date Mon, 26 Jun 2017 09:39:43 GMT
Repository: ignite
Updated Branches:
  refs/heads/gg-12133 [created] ecd9d1c1e


gg-12133


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ecd9d1c1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ecd9d1c1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ecd9d1c1

Branch: refs/heads/gg-12133
Commit: ecd9d1c1ea512add29a1d27df9f6c608e8b9d7c9
Parents: b1736c0
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Jun 26 12:39:30 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Jun 26 12:39:30 2017 +0300

----------------------------------------------------------------------
 .../checkpoint/GridCheckpointManager.java       |   2 +-
 .../managers/communication/GridIoManager.java   |   8 +-
 .../communication/GridMessageListener.java      |   3 +-
 .../deployment/GridDeploymentCommunication.java |   4 +-
 .../eventstorage/GridEventStorageManager.java   |   4 +-
 .../processors/cache/GridCacheIoManager.java    |  37 ++--
 .../cache/transactions/IgniteTxManager.java     |   2 +-
 .../clock/GridClockSyncProcessor.java           |   2 +-
 .../continuous/GridContinuousProcessor.java     |   4 +-
 .../datastreamer/DataStreamProcessor.java       |   2 +-
 .../datastreamer/DataStreamerImpl.java          |   2 +-
 .../processors/igfs/IgfsDataManager.java        |   2 +-
 .../igfs/IgfsFragmentizerManager.java           |   4 +-
 .../processors/job/GridJobProcessor.java        |   8 +-
 .../handlers/task/GridTaskCommandHandler.java   |   4 +-
 .../processors/task/GridTaskProcessor.java      |   6 +-
 .../jobstealing/JobStealingCollisionSpi.java    |   2 +-
 .../internal/TestRecordingCommunicationSpi.java |  29 +++
 ...idCommunicationManagerListenersSelfTest.java |   2 +-
 .../GridCommunicationSendMessageSelfTest.java   |   2 +-
 .../cache/GridCachePartitionedGetSelfTest.java  |   2 +-
 .../IgniteBinaryMetadataUpdateFromInvoke.java   | 187 +++++++++++++++++++
 .../communication/GridIoManagerBenchmark.java   |   4 +-
 .../communication/GridIoManagerBenchmark0.java  |  12 +-
 .../communication/GridCacheMessageSelfTest.java |   2 +-
 .../testframework/GridSpiTestContext.java       |   4 +-
 .../IgniteCacheRestartTestSuite2.java           |   2 +
 .../query/h2/opt/GridH2IndexBase.java           |   2 +-
 .../query/h2/twostep/GridMapQueryExecutor.java  |   2 +-
 .../h2/twostep/GridReduceQueryExecutor.java     |   2 +-
 30 files changed, 288 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
index 9124caf..40c2c63 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
@@ -449,7 +449,7 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
          * @param msg Received message.
          */
         @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             GridCheckpointRequest req = (GridCheckpointRequest)msg;
 
             if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 3df29cf..fe89d37 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -1079,7 +1079,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             CUR_PLC.set(plc);
 
         try {
-            lsnr.onMessage(nodeId, msg);
+            lsnr.onMessage(nodeId, msg, plc);
         }
         finally {
             if (change)
@@ -1905,14 +1905,14 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
          * @param nodeId Node ID.
          * @param msg Message.
          */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             GridMessageListener[] arr0 = arr;
 
             if (arr0 == null)
                 return;
 
             for (GridMessageListener l : arr0)
-                l.onMessage(nodeId, msg);
+                l.onMessage(nodeId, msg, plc);
         }
 
         /**
@@ -2012,7 +2012,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         /** {@inheritDoc} */
         @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter", "ConstantConditions",
             "OverlyStrongTypeCast"})
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             if (!(msg instanceof GridIoUserMessage)) {
                 U.error(log, "Received unknown message (potentially fatal problem): " + msg);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridMessageListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridMessageListener.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridMessageListener.java
index 3993591..c7de57c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridMessageListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridMessageListener.java
@@ -30,6 +30,7 @@ public interface GridMessageListener extends EventListener {
      * @param nodeId ID of node that sent the message. Note that may have already
      *      left topology by the time this message is received.
      * @param msg Message received.
+     * @param plc Message policy (pool).
      */
-    public void onMessage(UUID nodeId, Object msg);
+    public void onMessage(UUID nodeId, Object msg, byte plc);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
index a571ae4..661bf68 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
@@ -82,7 +82,7 @@ class GridDeploymentCommunication {
         this.log = log.getLogger(getClass());
 
         peerLsnr = new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 processDeploymentRequest(nodeId, msg);
             }
         };
@@ -416,7 +416,7 @@ class GridDeploymentCommunication {
         };
 
         GridMessageListener resLsnr = new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 assert nodeId != null;
                 assert msg != null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
index 607bb96..b77ec27 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
@@ -877,7 +877,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
 
         GridMessageListener resLsnr = new GridMessageListener() {
             @SuppressWarnings("deprecation")
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 assert nodeId != null;
                 assert msg != null;
 
@@ -1054,7 +1054,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
      */
     private class RequestListener implements GridMessageListener {
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             assert nodeId != null;
             assert msg != null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 924ce79..39087b6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -33,6 +33,7 @@ import org.apache.ignite.binary.BinaryObjectException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -127,7 +128,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
     /** Message listener. */
     private GridMessageListener lsnr = new GridMessageListener() {
-        @Override public void onMessage(final UUID nodeId, final Object msg) {
+        @Override public void onMessage(final UUID nodeId, final Object msg, final byte plc) {
             if (log.isDebugEnabled())
                 log.debug("Received unordered cache communication message [nodeId=" + nodeId +
                     ", locId=" + cctx.localNodeId() + ", msg=" + msg + ']');
@@ -206,6 +207,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                         log.debug(msg0.toString());
                     }
 
+                    if (plc == GridIoPolicy.UTILITY_CACHE_POOL)
+                        rmtAffVer = new AffinityTopologyVersion(cctx.localNode().order());
+
                     fut = cctx.exchange().affinityReadyFuture(rmtAffVer);
                 }
             }
@@ -213,22 +217,27 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
             if (fut != null && !fut.isDone()) {
                 fut.listen(new CI1<IgniteInternalFuture<?>>() {
                     @Override public void apply(IgniteInternalFuture<?> t) {
-                        cctx.kernalContext().closure().runLocalSafe(new Runnable() {
-                            @Override public void run() {
-                                IgniteLogger log = cacheMsg.messageLogger(cctx);
+                        try {
+                            cctx.kernalContext().pools().poolForPolicy(plc).execute(new Runnable() {
+                                @Override public void run() {
+                                    IgniteLogger log = cacheMsg.messageLogger(cctx);
 
-                                if (log.isDebugEnabled()) {
-                                    StringBuilder msg0 = new StringBuilder("Process cache message after wait for " +
-                                        "affinity topology version [");
+                                    if (log.isDebugEnabled()) {
+                                        StringBuilder msg0 = new StringBuilder("Process cache message after wait for " +
+                                            "affinity topology version [");
 
-                                    appendMessageInfo(cacheMsg, nodeId, msg0).append(']');
+                                        appendMessageInfo(cacheMsg, nodeId, msg0).append(']');
 
-                                    log.debug(msg0.toString());
-                                }
+                                        log.debug(msg0.toString());
+                                    }
 
-                                handleMessage(nodeId, cacheMsg);
-                            }
-                        });
+                                    handleMessage(nodeId, cacheMsg);
+                                }
+                            });
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.error(cacheMsg.messageLogger(cctx), "Failed to get pool for policy: " + plc, e);
+                        }
                     }
                 });
 
@@ -1336,7 +1345,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
         /** {@inheritDoc} */
         @SuppressWarnings({"CatchGenericClass", "unchecked"})
-        @Override public void onMessage(final UUID nodeId, Object msg) {
+        @Override public void onMessage(final UUID nodeId, Object msg, byte plc) {
             if (log.isDebugEnabled())
                 log.debug("Received cache ordered message [nodeId=" + nodeId + ", msg=" + msg + ']');
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 2c02f96..2f21614 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -2472,7 +2472,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     private class DeadlockDetectionListener implements GridMessageListener {
         /** {@inheritDoc} */
         @SuppressWarnings("unchecked")
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             GridCacheMessage cacheMsg = (GridCacheMessage)msg;
 
             unmarshall(nodeId, cacheMsg);

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
index 0764316..3586956 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
@@ -96,7 +96,7 @@ public class GridClockSyncProcessor extends GridProcessorAdapter {
         srv.start(ctx);
 
         ctx.io().addMessageListener(TOPIC_TIME_SYNC, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 assert msg instanceof GridClockDeltaSnapshotMessage;
 
                 GridClockDeltaSnapshotMessage msg0 = (GridClockDeltaSnapshotMessage)msg;

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 9fd9b6d..f0429bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -288,7 +288,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             });
 
         ctx.io().addMessageListener(TOPIC_CONTINUOUS, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object obj) {
+            @Override public void onMessage(UUID nodeId, Object obj, byte plc) {
                 GridContinuousMessage msg = (GridContinuousMessage)obj;
 
                 if (msg.data() == null && msg.dataBytes() != null) {
@@ -721,7 +721,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     private void registerMessageListener(GridContinuousHandler hnd) {
         if (hnd.orderedTopic() != null) {
             ctx.io().addMessageListener(hnd.orderedTopic(), new GridMessageListener() {
-                @Override public void onMessage(UUID nodeId, Object obj) {
+                @Override public void onMessage(UUID nodeId, Object obj, byte plc) {
                     GridContinuousMessage msg = (GridContinuousMessage)obj;
 
                     // Only notification can be ordered.

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index fee4dd6..6f35a52 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -80,7 +80,7 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
 
         if (!ctx.clientNode()) {
             ctx.io().addMessageListener(TOPIC_DATASTREAM, new GridMessageListener() {
-                @Override public void onMessage(UUID nodeId, Object msg) {
+                @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                     assert msg instanceof DataStreamerRequest;
 
                     processRequest(nodeId, (DataStreamerRequest)msg);

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index bb9ffdd..515314e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -321,7 +321,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
         topic = TOPIC_DATASTREAM.topic(IgniteUuid.fromUuid(ctx.localNodeId()));
 
         ctx.io().addMessageListener(topic, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 assert msg instanceof DataStreamerResponse;
 
                 DataStreamerResponse res = (DataStreamerResponse)msg;

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index e534800..59e1b72 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -153,7 +153,7 @@ public class IgfsDataManager extends IgfsManager {
         topic = F.isEmpty(igfsName) ? TOPIC_IGFS : TOPIC_IGFS.topic(igfsName);
 
         igfsCtx.kernalContext().io().addMessageListener(topic, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 if (msg instanceof IgfsBlocksMessage)
                     processBlocksMessage(nodeId, (IgfsBlocksMessage)msg);
                 else if (msg instanceof IgfsAckMessage)

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
index 2e82f33..f76b877 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
@@ -453,7 +453,7 @@ public class IgfsFragmentizerManager extends IgfsManager {
         }
 
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             if (msg instanceof IgfsFragmentizerResponse) {
                 IgfsFragmentizerResponse res = (IgfsFragmentizerResponse)msg;
 
@@ -673,7 +673,7 @@ public class IgfsFragmentizerManager extends IgfsManager {
         }
 
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             if (msg instanceof IgfsFragmentizerRequest ||
                 msg instanceof IgfsSyncMessage) {
                 if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index ea9cbd7..eb3300c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -444,7 +444,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
         final Condition cond = lock.newCondition();
 
         GridMessageListener msgLsnr = new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 String err = null;
                 GridJobSiblingsResponse res = null;
 
@@ -1842,7 +1842,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
      */
     private class JobSessionListener implements GridMessageListener {
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             assert nodeId != null;
             assert msg != null;
 
@@ -1858,7 +1858,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
      */
     private class JobCancelListener implements GridMessageListener {
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             assert nodeId != null;
             assert msg != null;
 
@@ -1876,7 +1876,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
      */
     private class JobExecutionListener implements GridMessageListener {
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             assert nodeId != null;
             assert msg != null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
index 947435c..88f8d4d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
@@ -109,7 +109,7 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter {
         super(ctx);
 
         ctx.io().addMessageListener(TOPIC_REST, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 if (!(msg instanceof GridTaskResultRequest)) {
                     U.warn(log, "Received unexpected message instead of task result request: " + msg);
 
@@ -425,7 +425,7 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter {
         final Condition cond = lock.newCondition();
 
         GridMessageListener msgLsnr = new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 String err = null;
                 GridTaskResultResponse res = null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index 9356864..7680e22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -1223,7 +1223,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
         }
 
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             if (msg instanceof GridJobExecuteResponse)
                 processJobExecuteResponse(nodeId, (GridJobExecuteResponse)msg);
             else if (jobResOnly)
@@ -1267,7 +1267,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
      */
     private class JobSiblingsMessageListener implements GridMessageListener {
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             if (!(msg instanceof GridJobSiblingsRequest)) {
                 U.warn(log, "Received unexpected message instead of siblings request: " + msg);
 
@@ -1339,7 +1339,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
      */
     private class TaskCancelMessageListener implements GridMessageListener {
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             assert msg != null;
 
             if (!(msg instanceof GridTaskCancelRequest)) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java
index f778bfc..fca2498 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java
@@ -547,7 +547,7 @@ public class JobStealingCollisionSpi extends IgniteSpiAdapter implements Collisi
 
         spiCtx.addMessageListener(
             msgLsnr = new GridMessageListener() {
-                @Override public void onMessage(UUID nodeId, Object msg) {
+                @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                     MessageInfo info = rcvMsgMap.get(nodeId);
 
                     if (info == null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
index 2aed459..17ca1a7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
@@ -88,6 +88,8 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi {
 
                     blockedMsgs.add(new T2<>(node, ioMsg));
 
+                    this.notifyAll();
+
                     return;
                 }
             }
@@ -137,6 +139,33 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi {
     }
 
     /**
+     * @param cls Message class.
+     * @param nodeName Node name.
+     * @throws InterruptedException If interrupted.
+     */
+    public void waitForBlocked(Class<?> cls, String nodeName) throws InterruptedException {
+        synchronized (this) {
+            while (!hasMessage(cls, nodeName))
+                wait();
+        }
+    }
+
+    /**
+     * @param cls Message class.
+     * @param nodeName Node name.
+     * @return {@code True} if has blocked message.
+     */
+    private boolean hasMessage(Class<?> cls, String nodeName) {
+        for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs) {
+            if (msg.get2().message().getClass() == cls &&
+                nodeName.equals(msg.get1().attribute(ATTR_GRID_NAME)))
+                return true;
+        }
+
+        return false;
+    }
+
+    /**
      * @param blockP Message block predicate.
      */
     public void blockMessages(IgnitePredicate<GridIoMessage> blockP) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationManagerListenersSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationManagerListenersSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationManagerListenersSelfTest.java
index 7613543..9289f86 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationManagerListenersSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationManagerListenersSelfTest.java
@@ -159,7 +159,7 @@ public class GridCommunicationManagerListenersSelfTest extends GridCommonAbstrac
         }
 
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             // No-op.
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
index 8503b48..08ee347 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
@@ -121,7 +121,7 @@ public class GridCommunicationSendMessageSelfTest extends GridCommonAbstractTest
         final CountDownLatch latch = new CountDownLatch(SAMPLE_CNT);
 
         mgr1.addMessageListener(topic, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 latch.countDown();
             }
         });

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java
index 2cb4a00..47d6092 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java
@@ -226,7 +226,7 @@ public class GridCachePartitionedGetSelfTest extends GridCommonAbstractTest {
                 ((IgniteKernal)g).context().io().addMessageListener(
                     TOPIC_CACHE,
                     new GridMessageListener() {
-                        @Override public void onMessage(UUID nodeId, Object msg) {
+                        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                             info("Received message from node [nodeId=" + nodeId + ", msg=" + msg + ']');
 
                             if (msg instanceof GridNearSingleGetRequest) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteBinaryMetadataUpdateFromInvoke.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteBinaryMetadataUpdateFromInvoke.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteBinaryMetadataUpdateFromInvoke.java
new file mode 100644
index 0000000..f919ca7
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteBinaryMetadataUpdateFromInvoke.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteBinaryMetadataUpdateFromInvoke extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        cfg.setMarshaller(null);
+
+        cfg.setClientMode(client);
+
+        CacheConfiguration ccfg = new CacheConfiguration("cache");
+
+        ccfg.setAtomicityMode(ATOMIC);
+        ccfg.setBackups(1);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+        cfg.setConsistentId(gridName);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMetadataUpdateFromAtomicInvoke() throws Exception {
+        for (int i = 0; i < 2; i++) {
+            client = false;
+
+            log.info("Iteration: " + i);
+
+            final int SRVS = 4;
+
+            startGrids(SRVS);
+
+            awaitPartitionMapExchange();
+
+            final CyclicBarrier b = new CyclicBarrier(3);
+
+            final AtomicBoolean stop = new AtomicBoolean();
+
+            final int META_PRIMARY_NODE = 0;
+            final int META_UPDATE_FROM_NODE = 1;
+
+            BinaryMarshaller marsh = (BinaryMarshaller)ignite(META_PRIMARY_NODE).configuration().getMarshaller();
+
+            marsh.getContext().registerClass(
+                marsh.binaryMarshaller().context().typeId(TestEnum1.class.getName()), TestEnum1.class);
+
+            TestRecordingCommunicationSpi testSpi =
+                (TestRecordingCommunicationSpi)ignite(META_UPDATE_FROM_NODE).configuration().getCommunicationSpi();
+
+            testSpi.blockMessages(GridNearTxPrepareRequest.class, getTestGridName(META_PRIMARY_NODE));
+
+            IgniteInternalFuture<?> sFut1 = GridTestUtils.runAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    IgniteCache<Object, Object> cache = ignite(META_UPDATE_FROM_NODE).cache("cache");
+
+                    List<Integer> keys = primaryKeys(cache, 1);
+
+                    b.await();
+
+                    cache.invoke(keys.get(0), new TestEntryProcessor());
+
+                    return null;
+                }
+            }, "async-node-0");
+
+            IgniteInternalFuture<?> sFut2 = GridTestUtils.runAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    IgniteCache<Object, Object> cache = ignite(META_PRIMARY_NODE).cache("cache");
+
+                    List<Integer> keys = primaryKeys(cache, 1);
+
+                    b.await();
+
+                    Thread.sleep(2000);
+
+                    cache.invoke(keys.get(0), new TestEntryProcessor());
+
+                    return null;
+                }
+            }, "async-node-1");
+
+            IgniteInternalFuture<?> fut2 = GridTestUtils.runAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    client = true;
+
+                    b.await();
+
+                    for (int i = 0; i < 1; i++)
+                        startGrid(SRVS + i);
+
+                    stop.set(true);
+
+                    return null;
+                }
+            });
+
+            testSpi.waitForBlocked(GridNearTxPrepareRequest.class, getTestGridName(META_PRIMARY_NODE));
+
+            U.sleep(5000);
+
+            testSpi.stopBlock();
+
+            sFut1.get();
+            sFut2.get();
+            fut2.get();
+
+            stopAllGrids();
+        }
+    }
+
+    /**
+     *
+     */
+    static class TestEntryProcessor implements CacheEntryProcessor<Object, Object, Object> {
+        @Override public Object process(MutableEntry<Object, Object> e, Object... args) {
+            e.setValue(TestEnum1.ENUM);
+
+            return null;
+        }
+    }
+
+    /**
+     *
+     */
+    enum TestEnum1 {
+        /** */
+        ENUM
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java
index 723495c..293e578 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java
@@ -240,7 +240,7 @@ public class GridIoManagerBenchmark {
         GridMessageListener lsnr = new GridMessageListener() {
             private ClusterNode node;
 
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 if (node == null)
                     node = g.context().discovery().node(nodeId);
 
@@ -336,7 +336,7 @@ public class GridIoManagerBenchmark {
      */
     private static class SenderMessageListener implements GridMessageListener {
         /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             msgCntr.increment();
 
             if (testLatency)

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
index f2c6255..83efd7d 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java
@@ -130,7 +130,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
         rcv.addMessageListener(
             topic,
             new GridMessageListener() {
-                @Override public void onMessage(UUID nodeId, Object msg) {
+                @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                     try {
                         rcv.send(sndNode, topic, (Message)msg, PUBLIC_POOL);
                     }
@@ -141,7 +141,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
             });
 
         snd.addMessageListener(topic, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 msgCntr.increment();
 
                 sem.release();
@@ -224,7 +224,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
         rcv.addMessageListener(
             topic,
             new GridMessageListener() {
-                @Override public void onMessage(UUID nodeId, Object msg) {
+                @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                     try {
                         rcv.send(sndNode, topic, (Message)msg, PUBLIC_POOL);
                     }
@@ -235,7 +235,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
             });
 
         snd.addMessageListener(topic, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 map.get(((GridTestMessage)msg).id()).countDown();
             }
         });
@@ -324,7 +324,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
         rcv.addMessageListener(
             topic,
             new GridMessageListener() {
-                @Override public void onMessage(UUID nodeId, Object msg) {
+                @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                     try {
                         rcv.send(sndNode, topic, (Message)msg, PUBLIC_POOL);
                     }
@@ -335,7 +335,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest {
             });
 
         snd.addMessageListener(topic, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 msgCntr.increment();
 
                 sem.release();

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
index 9c97542..2d04db2 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
@@ -139,7 +139,7 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
         final CountDownLatch latch = new CountDownLatch(SAMPLE_CNT);
 
         mgr1.addMessageListener(topic, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 try {
                     latch.countDown();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index 1c8acbc..0c04039 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -328,7 +328,7 @@ public class GridSpiTestContext implements IgniteSpiContext {
     @SuppressWarnings("deprecation")
     public void triggerMessage(ClusterNode node, Object msg) {
         for (GridMessageListener lsnr : msgLsnrs)
-            lsnr.onMessage(node.id(), msg);
+            lsnr.onMessage(node.id(), msg, (byte)0);
     }
 
     /** {@inheritDoc} */
@@ -667,7 +667,7 @@ public class GridSpiTestContext implements IgniteSpiContext {
         @SuppressWarnings({
             "SynchronizationOnLocalVariableOrMethodParameter", "ConstantConditions",
             "OverlyStrongTypeCast"})
-        @Override public void onMessage(UUID nodeId, Object msg) {
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             GridIoUserMessage ioMsg = (GridIoUserMessage)msg;
 
             ClusterNode node = locNode;

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java
index 0513786..fb38c55 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java
@@ -19,6 +19,7 @@ package org.apache.ignite.testsuites;
 
 import junit.framework.TestSuite;
 import org.apache.ignite.internal.processors.cache.GridCachePutAllFailoverSelfTest;
+import org.apache.ignite.internal.processors.cache.IgniteBinaryMetadataUpdateFromInvoke;
 import org.apache.ignite.internal.processors.cache.IgniteCacheAtomicPutAllFailoverSelfTest;
 import org.apache.ignite.internal.processors.cache.IgniteCachePutAllRestartTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteBinaryMetadataUpdateNodeRestartTest;
@@ -45,6 +46,7 @@ public class IgniteCacheRestartTestSuite2 extends TestSuite {
         suite.addTestSuite(GridCachePutAllFailoverSelfTest.class);
 
         suite.addTestSuite(IgniteBinaryMetadataUpdateNodeRestartTest.class);
+        suite.addTestSuite(IgniteBinaryMetadataUpdateFromInvoke.class);
 
         suite.addTestSuite(IgniteCacheGetRestartTest.class);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index c29239f..22b94c7 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -138,7 +138,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
             msgTopic = new IgniteBiTuple<>(GridTopic.TOPIC_QUERY, tbl.identifier() + '.' + getName());
 
             msgLsnr = new GridMessageListener() {
-                @Override public void onMessage(UUID nodeId, Object msg) {
+                @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                     GridSpinBusyLock l = desc.indexing().busyLock();
 
                     if (!l.enterBusy())

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index ac1a6a6..0605287 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -167,7 +167,7 @@ public class GridMapQueryExecutor {
         }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
 
         ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 if (!busyLock.enterBusy())
                     return;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecd9d1c1/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 3f886ee..71c0e12 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -190,7 +190,7 @@ public class GridReduceQueryExecutor {
         log = ctx.log(GridReduceQueryExecutor.class);
 
         ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 if (!busyLock.enterBusy())
                     return;
 


Mime
View raw message