ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [4/5] ignite git commit: IGNITE-4157 Use discovery custom messages instead of marshaller cache - Fixes #1271.
Date Thu, 19 Jan 2017 12:05:44 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
index 18235d2..cb673d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
@@ -40,14 +40,11 @@ public class GridIoPolicy {
     /** Utility cache execution pool. */
     public static final byte UTILITY_CACHE_POOL = 5;
 
-    /** Marshaller cache execution pool. */
-    public static final byte MARSH_CACHE_POOL = 6;
-
     /** IGFS pool. */
-    public static final byte IGFS_POOL = 7;
+    public static final byte IGFS_POOL = 6;
 
     /** Pool for handling distributed index range requests. */
-    public static final byte IDX_POOL = 8;
+    public static final byte IDX_POOL = 7;
 
     /** Data streamer execution pool. */
     public static final byte DATA_STREAMER_POOL = 9;

http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 9aa4db1..d15a87a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.managers.discovery;
 
 import java.io.Externalizable;
-import java.io.Serializable;
 import java.lang.management.GarbageCollectorMXBean;
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryMXBean;
@@ -104,6 +103,8 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.security.SecurityCredentials;
 import org.apache.ignite.plugin.segmentation.SegmentationPolicy;
 import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
 import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
 import org.apache.ignite.spi.discovery.DiscoverySpi;
 import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
@@ -642,41 +643,40 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         });
 
         spi.setDataExchange(new DiscoverySpiDataExchange() {
-            @Override public Map<Integer, Serializable> collect(UUID nodeId) {
-                assert nodeId != null;
+            @Override public DiscoveryDataBag collect(DiscoveryDataBag dataBag) {
+                assert dataBag != null;
+                assert dataBag.joiningNodeId() != null;
 
-                Map<Integer, Serializable> data = new HashMap<>();
-
-                for (GridComponent comp : ctx.components()) {
-                    Serializable compData = comp.collectDiscoveryData(nodeId);
-
-                    if (compData != null) {
-                        assert comp.discoveryDataType() != null;
-
-                        data.put(comp.discoveryDataType().ordinal(), compData);
-                    }
+                if (ctx.localNodeId().equals(dataBag.joiningNodeId())) {
+                    for (GridComponent c : ctx.components())
+                        c.collectJoiningNodeData(dataBag);
+                }
+                else {
+                    for (GridComponent c : ctx.components())
+                        c.collectGridNodeData(dataBag);
                 }
 
-                return data;
+                return dataBag;
             }
 
-            @Override public void onExchange(UUID joiningNodeId, UUID nodeId, Map<Integer, Serializable> data) {
-                for (Map.Entry<Integer, Serializable> e : data.entrySet()) {
-                    GridComponent comp = null;
-
+            @Override public void onExchange(DiscoveryDataBag dataBag) {
+                if (ctx.localNodeId().equals(dataBag.joiningNodeId())) {
+                    //NodeAdded msg reached joining node after round-trip over the ring
                     for (GridComponent c : ctx.components()) {
-                        if (c.discoveryDataType() != null && c.discoveryDataType().ordinal() == e.getKey()) {
-                            comp = c;
-
-                            break;
-                        }
+                        if (c.discoveryDataType() != null)
+                            c.onGridDataReceived(dataBag.gridDiscoveryData(c.discoveryDataType().ordinal()));
                     }
+                }
+                else {
+                    //discovery data from newly joined node has to be applied to the current old node
+                    for (GridComponent c : ctx.components()) {
+                        if (c.discoveryDataType() != null) {
+                            JoiningNodeDiscoveryData data =
+                                    dataBag.newJoinerDiscoveryData(c.discoveryDataType().ordinal());
 
-                    if (comp != null)
-                        comp.onDiscoveryDataReceived(joiningNodeId, nodeId, e.getValue());
-                    else {
-                        if (log.isDebugEnabled())
-                            log.debug("Received discovery data for unknown component: " + e.getKey());
+                            if (data != null)
+                                c.onJoiningNodeDataReceived(data);
+                        }
                     }
                 }
             }
@@ -1555,6 +1555,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         return discoCache().allNodes();
     }
 
+    /** @return all alive server nodes is topology */
+    public Collection<ClusterNode> aliveSrvNodes() {
+        return discoCache().aliveSrvNodes();
+    }
+
     /** @return Full topology size. */
     public int size() {
         return discoCache().allNodes().size();
@@ -2538,6 +2543,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         /** Highest node order. */
         private final long maxOrder;
 
+        /** Alive server nodes */
+        private final Collection<ClusterNode> aliveSrvNodes;
+
         /**
          * Cached alive nodes list. As long as this collection doesn't accept {@code null}s use {@link
          * #maskNull(String)} before passing raw cache names to it.
@@ -2589,6 +2597,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
             aliveSrvNodesWithCaches = new ConcurrentSkipListMap<>(GridNodeOrderComparator.INSTANCE);
             nodesByVer = new TreeMap<>();
 
+            List<ClusterNode> aliveSrvNodesList = new ArrayList<>(allNodes.size());
+
             long maxOrder0 = 0;
 
             Set<String> nearEnabledSet = new HashSet<>();
@@ -2640,8 +2650,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     }
                 }
 
-                if (hasCaches && alive(node.id()) && !CU.clientNode(node))
-                    aliveSrvNodesWithCaches.put(node, Boolean.TRUE);
+                if (alive(node.id()) && !CU.clientNode(node)) {
+                    aliveSrvNodesList.add(node);
+
+                    if (hasCaches)
+                        aliveSrvNodesWithCaches.put(node, Boolean.TRUE);
+                }
 
                 IgniteProductVersion nodeVer = U.productVersion(node);
 
@@ -2673,6 +2687,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
             maxOrder = maxOrder0;
 
+            aliveSrvNodes = Collections.unmodifiableList(aliveSrvNodesList);
+
             allCacheNodes = Collections.unmodifiableMap(cacheMap);
             rmtCacheNodes = Collections.unmodifiableMap(rmtCacheMap);
             affCacheNodes = Collections.unmodifiableMap(dhtNodesMap);
@@ -2770,6 +2786,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         }
 
         /**
+         * Gets all alive server nodes.
+         */
+        Collection<ClusterNode> aliveSrvNodes() {
+            return aliveSrvNodes;
+        }
+
+        /**
          * Gets all remote nodes that have at least one cache configured.
          *
          * @param topVer Topology version.

http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
index e4896fd..4b4aec5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
@@ -17,8 +17,6 @@
 
 package org.apache.ignite.internal.processors;
 
-import java.io.Serializable;
-import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
@@ -28,6 +26,9 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.spi.IgniteNodeValidationResult;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -79,12 +80,22 @@ public abstract class GridProcessorAdapter implements GridProcessor {
     }
 
     /** {@inheritDoc} */
-    @Override @Nullable public Serializable collectDiscoveryData(UUID nodeId) {
-        return null;
+    @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onGridDataReceived(GridDiscoveryData data) {
+        // No-op.
     }
 
     /** {@inheritDoc} */
-    @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable data) {
+    @Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) {
         // No-op.
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheType.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheType.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheType.java
index 4886e61..c5855d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheType.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheType.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MARSH_CACHE_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL;
 
@@ -38,12 +37,7 @@ public enum CacheType {
     /**
      * Internal replicated cache, should use separate thread pool.
      */
-    UTILITY(false, UTILITY_CACHE_POOL),
-
-    /**
-     * Internal marshaller cache, should use separate thread pool.
-     */
-    MARSHALLER(false, MARSH_CACHE_POOL);
+    UTILITY(false, UTILITY_CACHE_POOL);
 
     /** */
     private final boolean userCache;

http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index e414160..1bd7442 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -501,7 +501,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     @Override public final GridCacheProxyImpl<K, V> withExpiryPolicy(ExpiryPolicy plc) {
         assert !CU.isUtilityCache(ctx.name());
         assert !CU.isAtomicsCache(ctx.name());
-        assert !CU.isMarshallerCache(ctx.name());
 
         CacheOperationContext opCtx = new CacheOperationContext(false, null, false, plc, false, null);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 9487589..c5725e7 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -118,6 +118,9 @@ import org.apache.ignite.lifecycle.LifecycleAware;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.marshaller.MarshallerUtils;
 import org.apache.ignite.spi.IgniteNodeValidationResult;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
@@ -138,6 +141,7 @@ import static org.apache.ignite.configuration.DeploymentMode.ISOLATED;
 import static org.apache.ignite.configuration.DeploymentMode.PRIVATE;
 import static org.apache.ignite.configuration.DeploymentMode.SHARED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CACHE_PROC;
 import static org.apache.ignite.internal.IgniteComponentType.JTA;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_CHECK_SKIPPED;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_TX_CONFIG;
@@ -613,7 +617,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             ctx.config().getCacheStoreSessionListenerFactories()));
 
         for (int i = 0; i < cfgs.length; i++) {
-            if (ctx.config().isDaemon() && !CU.isMarshallerCache(cfgs[i].getName()))
+            if (ctx.config().isDaemon())
                 continue;
 
             cloneCheckSerializable(cfgs[i]);
@@ -644,8 +648,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             if (CU.isUtilityCache(cfg.getName()))
                 cacheType = CacheType.UTILITY;
-            else if (CU.isMarshallerCache(cfg.getName()))
-                cacheType = CacheType.MARSHALLER;
             else if (internalCaches.contains(maskNull(cfg.getName())))
                 cacheType = CacheType.INTERNAL;
             else
@@ -768,7 +770,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             // Start dynamic caches received from collect discovery data.
             for (DynamicCacheDescriptor desc : registeredCaches.values()) {
-                if (ctx.config().isDaemon() && !CU.isMarshallerCache(desc.cacheConfiguration().getName()))
+                if (ctx.config().isDaemon())
                     continue;
 
                 desc.clearRemoteConfigurations();
@@ -818,8 +820,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         for (GridCacheAdapter<?, ?> cache : caches.values())
             onKernalStart(cache);
 
-        ctx.marshallerContext().onMarshallerCacheStarted(ctx);
-
         if (!ctx.config().isDaemon())
             ctx.cacheObjects().onUtilityCacheStarted();
 
@@ -845,7 +845,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             }
         }
 
-        assert caches.containsKey(CU.MARSH_CACHE_NAME) : "Marshaller cache should be started";
         assert ctx.config().isDaemon() || caches.containsKey(CU.UTILITY_CACHE_NAME) : "Utility cache should be started";
     }
 
@@ -985,7 +984,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             boolean stopped;
 
-            boolean sysCache = CU.isMarshallerCache(name) || CU.isUtilityCache(name) || CU.isAtomicsCache(name);
+            boolean sysCache = CU.isUtilityCache(name) || CU.isAtomicsCache(name);
 
             if (!sysCache) {
                 DynamicCacheDescriptor oldDesc = cachesOnDisconnect.get(maskNull(name));
@@ -1890,12 +1889,24 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
-        return DiscoveryDataExchangeType.CACHE_PROC;
+        return CACHE_PROC;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
+        dataBag.addJoiningNodeData(CACHE_PROC.ordinal(), getDiscoveryData(dataBag.joiningNodeId()));
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public Serializable collectDiscoveryData(UUID nodeId) {
-        boolean reconnect = ctx.localNodeId().equals(nodeId) && cachesOnDisconnect != null;
+    @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
+        dataBag.addNodeSpecificData(CACHE_PROC.ordinal(), getDiscoveryData(dataBag.joiningNodeId()));
+    }
+
+    /**
+     * @param joiningNodeId Joining node id.
+     */
+    private Serializable getDiscoveryData(UUID joiningNodeId) {
+        boolean reconnect = ctx.localNodeId().equals(joiningNodeId) && cachesOnDisconnect != null;
 
         // Collect dynamically started caches to a single object.
         Collection<DynamicCacheChangeRequest> reqs;
@@ -1907,178 +1918,225 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             clientNodesMap = U.newHashMap(caches.size());
 
-            for (GridCacheAdapter<?, ?> cache : caches.values()) {
-                DynamicCacheDescriptor desc = cachesOnDisconnect.get(maskNull(cache.name()));
+            collectDataOnReconnectingNode(reqs, clientNodesMap, joiningNodeId);
+        }
+        else {
+            reqs = new ArrayList<>(registeredCaches.size() + registeredTemplates.size());
 
-                if (desc == null)
-                    continue;
+            clientNodesMap = ctx.discovery().clientNodesMap();
 
-                DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(cache.name(), null);
+            collectDataOnGridNode(reqs);
+        }
 
-                req.startCacheConfiguration(desc.cacheConfiguration());
+        DynamicCacheChangeBatch batch = new DynamicCacheChangeBatch(reqs);
 
-                req.cacheType(desc.cacheType());
+        batch.clientNodes(clientNodesMap);
 
-                req.deploymentId(desc.deploymentId());
+        batch.clientReconnect(reconnect);
+
+        // Reset random batch ID so that serialized batches with the same descriptors will be exactly the same.
+        batch.id(null);
+
+        return batch;
+    }
 
-                req.receivedFrom(desc.receivedFrom());
+    /**
+     * @param reqs requests.
+     */
+    private void collectDataOnGridNode(Collection<DynamicCacheChangeRequest> reqs) {
+        for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+            DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null);
 
-                reqs.add(req);
+            req.startCacheConfiguration(desc.cacheConfiguration());
 
-                Boolean nearEnabled = cache.isNear();
+            req.cacheType(desc.cacheType());
 
-                Map<UUID, Boolean> map = U.newHashMap(1);
+            req.deploymentId(desc.deploymentId());
 
-                map.put(nodeId, nearEnabled);
+            req.receivedFrom(desc.receivedFrom());
 
-                clientNodesMap.put(cache.name(), map);
-            }
+            reqs.add(req);
         }
-        else {
-            reqs = new ArrayList<>(registeredCaches.size() + registeredTemplates.size());
 
-            for (DynamicCacheDescriptor desc : registeredCaches.values()) {
-                DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null);
+        for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
+            DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null);
 
-                req.startCacheConfiguration(desc.cacheConfiguration());
+            req.startCacheConfiguration(desc.cacheConfiguration());
 
-                req.cacheType(desc.cacheType());
+            req.template(true);
 
-                req.deploymentId(desc.deploymentId());
+            reqs.add(req);
+        }
+    }
 
-                req.receivedFrom(desc.receivedFrom());
+    /**
+     * @param reqs requests.
+     * @param clientNodesMap Client nodes map.
+     * @param nodeId Node id.
+     */
+    private void collectDataOnReconnectingNode(
+            Collection<DynamicCacheChangeRequest> reqs,
+            Map<String, Map<UUID, Boolean>> clientNodesMap,
+            UUID nodeId
+    ) {
+        for (GridCacheAdapter<?, ?> cache : caches.values()) {
+            DynamicCacheDescriptor desc = cachesOnDisconnect.get(maskNull(cache.name()));
 
-                reqs.add(req);
-            }
+            if (desc == null)
+                continue;
 
-            for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
-                DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null);
+            DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(cache.name(), null);
 
-                req.startCacheConfiguration(desc.cacheConfiguration());
+            req.startCacheConfiguration(desc.cacheConfiguration());
 
-                req.template(true);
+            req.cacheType(desc.cacheType());
 
-                reqs.add(req);
-            }
+            req.deploymentId(desc.deploymentId());
 
-            clientNodesMap = ctx.discovery().clientNodesMap();
-        }
+            req.receivedFrom(desc.receivedFrom());
 
-        DynamicCacheChangeBatch batch = new DynamicCacheChangeBatch(reqs);
+            reqs.add(req);
 
-        batch.clientNodes(clientNodesMap);
+            Boolean nearEnabled = cache.isNear();
 
-        batch.clientReconnect(reconnect);
+            Map<UUID, Boolean> map = U.newHashMap(1);
 
-        // Reset random batch ID so that serialized batches with the same descriptors will be exactly the same.
-        batch.id(null);
+            map.put(nodeId, nearEnabled);
 
-        return batch;
+            clientNodesMap.put(cache.name(), map);
+        }
     }
 
     /** {@inheritDoc} */
-    @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable data) {
-        if (data instanceof DynamicCacheChangeBatch) {
-            DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)data;
+    @Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) {
+        if (data.hasJoiningNodeData()) {
+            Serializable joiningNodeData = data.joiningNodeData();
+            if (joiningNodeData instanceof DynamicCacheChangeBatch)
+                onDiscoDataReceived(
+                        data.joiningNodeId(),
+                        data.joiningNodeId(),
+                        (DynamicCacheChangeBatch) joiningNodeData);
+        }
+    }
 
-            if (batch.clientReconnect()) {
-                if (ctx.clientDisconnected()) {
-                    if (clientReconnectReqs == null)
-                        clientReconnectReqs = new LinkedHashMap<>();
+    /** {@inheritDoc} */
+    @Override public void onGridDataReceived(GridDiscoveryData data) {
+        Map<UUID, Serializable> nodeSpecData = data.nodeSpecificData();
 
-                    clientReconnectReqs.put(joiningNodeId, batch);
+        if (nodeSpecData != null) {
+            for (Map.Entry<UUID, Serializable> e : nodeSpecData.entrySet()) {
+                if (e.getValue() != null && e.getValue() instanceof DynamicCacheChangeBatch) {
+                    DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch) e.getValue();
 
-                    return;
+                    onDiscoDataReceived(data.joiningNodeId(), e.getKey(), batch);
                 }
+            }
+        }
+    }
 
-                processClientReconnectData(joiningNodeId, batch);
+    /**
+     * @param joiningNodeId Joining node id.
+     * @param rmtNodeId Rmt node id.
+     * @param batch Batch.
+     */
+    private void onDiscoDataReceived(UUID joiningNodeId, UUID rmtNodeId, DynamicCacheChangeBatch batch) {
+        if (batch.clientReconnect()) {
+            if (ctx.clientDisconnected()) {
+                if (clientReconnectReqs == null)
+                    clientReconnectReqs = new LinkedHashMap<>();
+
+                clientReconnectReqs.put(joiningNodeId, batch);
+
+                return;
             }
-            else {
-                for (DynamicCacheChangeRequest req : batch.requests()) {
-                    initReceivedCacheConfiguration(req);
 
-                    if (req.template()) {
-                        CacheConfiguration ccfg = req.startCacheConfiguration();
+            processClientReconnectData(joiningNodeId, batch);
+        }
+        else {
+            for (DynamicCacheChangeRequest req : batch.requests()) {
+                initReceivedCacheConfiguration(req);
+
+                if (req.template()) {
+                    CacheConfiguration ccfg = req.startCacheConfiguration();
 
-                        assert ccfg != null : req;
+                    assert ccfg != null : req;
 
-                        DynamicCacheDescriptor existing = registeredTemplates.get(maskNull(req.cacheName()));
+                    DynamicCacheDescriptor existing = registeredTemplates.get(maskNull(req.cacheName()));
 
-                        if (existing == null) {
-                            DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
+                    if (existing == null) {
+                        DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
                                 ctx,
                                 ccfg,
                                 req.cacheType(),
                                 true,
                                 req.deploymentId());
 
-                            registeredTemplates.put(maskNull(req.cacheName()), desc);
-                        }
-
-                        continue;
+                        registeredTemplates.put(maskNull(req.cacheName()), desc);
                     }
 
-                    DynamicCacheDescriptor existing = registeredCaches.get(maskNull(req.cacheName()));
+                    continue;
+                }
 
-                    if (req.start() && !req.clientStartOnly()) {
-                        CacheConfiguration ccfg = req.startCacheConfiguration();
+                DynamicCacheDescriptor existing = registeredCaches.get(maskNull(req.cacheName()));
 
-                        if (existing != null) {
-                            if (joiningNodeId.equals(ctx.localNodeId())) {
-                                existing.receivedFrom(req.receivedFrom());
+                if (req.start() && !req.clientStartOnly()) {
+                    CacheConfiguration ccfg = req.startCacheConfiguration();
 
-                                existing.deploymentId(req.deploymentId());
-                            }
+                    if (existing != null) {
+                        if (joiningNodeId.equals(ctx.localNodeId())) {
+                            existing.receivedFrom(req.receivedFrom());
 
-                            if (existing.locallyConfigured()) {
-                                existing.addRemoteConfiguration(rmtNodeId, req.startCacheConfiguration());
+                            existing.deploymentId(req.deploymentId());
+                        }
 
-                                ctx.discovery().setCacheFilter(
+                        if (existing.locallyConfigured()) {
+                            existing.addRemoteConfiguration(rmtNodeId, req.startCacheConfiguration());
+
+                            ctx.discovery().setCacheFilter(
                                     req.cacheName(),
                                     ccfg.getNodeFilter(),
                                     ccfg.getNearConfiguration() != null,
                                     ccfg.getCacheMode());
-                            }
                         }
-                        else {
-                            assert req.cacheType() != null : req;
+                    }
+                    else {
+                        assert req.cacheType() != null : req;
 
-                            DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
+                        DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
                                 ctx,
                                 ccfg,
                                 req.cacheType(),
                                 false,
                                 req.deploymentId());
 
-                            // Received statically configured cache.
-                            if (req.initiatingNodeId() == null)
-                                desc.staticallyConfigured(true);
+                        // Received statically configured cache.
+                        if (req.initiatingNodeId() == null)
+                            desc.staticallyConfigured(true);
 
-                            if (joiningNodeId.equals(ctx.localNodeId()))
-                                desc.receivedOnDiscovery(true);
+                        if (joiningNodeId.equals(ctx.localNodeId()))
+                            desc.receivedOnDiscovery(true);
 
-                            desc.receivedFrom(req.receivedFrom());
+                        desc.receivedFrom(req.receivedFrom());
 
-                            DynamicCacheDescriptor old = registeredCaches.put(maskNull(req.cacheName()), desc);
+                        DynamicCacheDescriptor old = registeredCaches.put(maskNull(req.cacheName()), desc);
 
-                            assert old == null : old;
+                        assert old == null : old;
 
-                            ctx.discovery().setCacheFilter(
+                        ctx.discovery().setCacheFilter(
                                 req.cacheName(),
                                 ccfg.getNodeFilter(),
                                 ccfg.getNearConfiguration() != null,
                                 ccfg.getCacheMode());
-                        }
                     }
                 }
+            }
 
-                if (!F.isEmpty(batch.clientNodes())) {
-                    for (Map.Entry<String, Map<UUID, Boolean>> entry : batch.clientNodes().entrySet()) {
-                        String cacheName = entry.getKey();
+            if (!F.isEmpty(batch.clientNodes())) {
+                for (Map.Entry<String, Map<UUID, Boolean>> entry : batch.clientNodes().entrySet()) {
+                    String cacheName = entry.getKey();
 
-                        for (Map.Entry<UUID, Boolean> tup : entry.getValue().entrySet())
-                            ctx.discovery().addClientNode(cacheName, tup.getKey(), tup.getValue());
-                    }
+                    for (Map.Entry<UUID, Boolean> tup : entry.getValue().entrySet())
+                        ctx.discovery().addClientNode(cacheName, tup.getKey(), tup.getValue());
                 }
             }
         }
@@ -2098,7 +2156,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             String name = req.cacheName();
 
-            boolean sysCache = CU.isMarshallerCache(name) || CU.isUtilityCache(name) || CU.isAtomicsCache(name);
+            boolean sysCache = CU.isUtilityCache(name) || CU.isAtomicsCache(name);
 
             if (!sysCache) {
                 DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName()));
@@ -3127,13 +3185,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * @return Marshaller system cache.
-     */
-    public GridCacheAdapter<Integer, String> marshallerCache() {
-        return internalCache(CU.MARSH_CACHE_NAME);
-    }
-
-    /**
      * Gets utility cache.
      *
      * @return Utility cache.

http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
index 0f855fe..e694e30 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
@@ -43,7 +43,6 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
         boolean cleanupDisabled = cctx.kernalContext().isDaemon() ||
             !cctx.config().isEagerTtl() ||
             CU.isAtomicsCache(cctx.name()) ||
-            CU.isMarshallerCache(cctx.name()) ||
             CU.isUtilityCache(cctx.name()) ||
             (cctx.kernalContext().clientNode() && cctx.config().getNearConfiguration() == null);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 969c41a..61a57f9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -154,9 +154,6 @@ public class GridCacheUtils {
     /** Atomics system cache name. */
     public static final String ATOMICS_CACHE_NAME = "ignite-atomics-sys-cache";
 
-    /** Marshaller system cache name. */
-    public static final String MARSH_CACHE_NAME = "ignite-marshaller-sys-cache";
-
     /** */
     public static final String CACHE_MSG_LOG_CATEGORY = "org.apache.ignite.cache.msg";
 
@@ -1176,14 +1173,6 @@ public class GridCacheUtils {
 
     /**
      * @param cacheName Cache name.
-     * @return {@code True} if this is marshaller system cache.
-     */
-    public static boolean isMarshallerCache(String cacheName) {
-        return MARSH_CACHE_NAME.equals(cacheName);
-    }
-
-    /**
-     * @param cacheName Cache name.
      * @return {@code True} if this is utility system cache.
      */
     public static boolean isUtilityCache(String cacheName) {
@@ -1203,7 +1192,7 @@ public class GridCacheUtils {
      * @return {@code True} if system cache.
      */
     public static boolean isSystemCache(String cacheName) {
-        return isMarshallerCache(cacheName) || isUtilityCache(cacheName) || isHadoopSystemCache(cacheName) ||
+        return isUtilityCache(cacheName) || isHadoopSystemCache(cacheName) ||
             isAtomicsCache(cacheName);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
index a00cf3e..976f05f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
@@ -62,6 +62,19 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT
         }
 
         if (topVer != null) {
+            try {
+                IgniteCheckedException err = tx.txState().validateTopology(cctx, topologyReadLock());
+
+                if (err != null) {
+                    onDone(err);
+
+                    return;
+                }
+            }
+            finally {
+                topologyReadUnlock();
+            }
+
             tx.topologyVersion(topVer);
 
             cctx.mvcc().addFuture(this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 85c01d9..a0ab0be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -2367,7 +2367,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                 },
                 new P1<IgniteInternalCache<?, ?>>() {
                     @Override public boolean apply(IgniteInternalCache<?, ?> c) {
-                        return !CU.MARSH_CACHE_NAME.equals(c.name()) && !CU.UTILITY_CACHE_NAME.equals(c.name()) &&
+                        return !CU.UTILITY_CACHE_NAME.equals(c.name()) &&
                             !CU.ATOMICS_CACHE_NAME.equals(c.name());
                     }
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
index 6500cf3..d1c8b2d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cluster;
 
 import java.io.Serializable;
 import java.lang.ref.WeakReference;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Timer;
@@ -36,9 +37,12 @@ import org.apache.ignite.internal.util.GridTimerTask;
 import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_UPDATE_NOTIFIER;
+import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CLUSTER_PROC;
 import static org.apache.ignite.internal.IgniteVersionUtils.VER_STR;
 
 /**
@@ -98,11 +102,24 @@ public class ClusterProcessor extends GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
-        return DiscoveryDataExchangeType.CLUSTER_PROC;
+        return CLUSTER_PROC;
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public Serializable collectDiscoveryData(UUID nodeId) {
+    @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
+        dataBag.addJoiningNodeData(CLUSTER_PROC.ordinal(), getDiscoveryData());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
+        dataBag.addNodeSpecificData(CLUSTER_PROC.ordinal(), getDiscoveryData());
+    }
+
+
+    /**
+     * @return Discovery data.
+     */
+    private Serializable getDiscoveryData() {
         HashMap<String, Object> map = new HashMap<>();
 
         map.put(ATTR_UPDATE_NOTIFIER_STATUS, notifyEnabled.get());
@@ -111,16 +128,36 @@ public class ClusterProcessor extends GridProcessorAdapter {
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable data) {
-        if (joiningNodeId.equals(ctx.localNodeId())) {
-            Map<String, Object> map = (Map<String, Object>)data;
+    @Override public void onGridDataReceived(GridDiscoveryData data) {
+        Map<UUID, Serializable> nodeSpecData = data.nodeSpecificData();
 
-            if (map != null && map.containsKey(ATTR_UPDATE_NOTIFIER_STATUS))
-                notifyEnabled.set((Boolean)map.get(ATTR_UPDATE_NOTIFIER_STATUS));
+        if (nodeSpecData != null) {
+            Boolean lstFlag = findLastFlag(nodeSpecData.values());
+
+            if (lstFlag != null)
+                notifyEnabled.set(lstFlag);
         }
     }
 
+
+    /**
+     * @param vals collection to seek through.
+     */
+    private Boolean findLastFlag(Collection<Serializable> vals) {
+        Boolean flag = null;
+
+        for (Serializable ser : vals) {
+            if (ser != null) {
+                Map<String, Object> map = (Map<String, Object>) ser;
+
+                if (map.containsKey(ATTR_UPDATE_NOTIFIER_STATUS))
+                    flag = (Boolean) map.get(ATTR_UPDATE_NOTIFIER_STATUS);
+            }
+        }
+
+        return flag;
+    }
+
     /** {@inheritDoc} */
     @Override public void onKernalStart() throws IgniteCheckedException {
         if (notifyEnabled.get()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 9fd9b6d..b0db510 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -78,6 +78,9 @@ import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
@@ -85,6 +88,7 @@ import org.jsr166.ConcurrentHashMap8;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;
+import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CONTINUOUS_PROC;
 import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
 import static org.apache.ignite.internal.GridTopic.TOPIC_CONTINUOUS;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
@@ -319,8 +323,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             }
         });
 
-        ctx.marshallerContext().onContinuousProcessorStarted(ctx);
-
         ctx.cacheObjects().onContinuousProcessorStarted(ctx);
 
         ctx.service().onContinuousProcessorStarted(ctx);
@@ -403,36 +405,42 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
-        return DiscoveryDataExchangeType.CONTINUOUS_PROC;
+        return CONTINUOUS_PROC;
     }
 
     /** {@inheritDoc} */
-    @Override @Nullable public Serializable collectDiscoveryData(UUID nodeId) {
-        if (log.isDebugEnabled()) {
-            log.debug("collectDiscoveryData [node=" + nodeId +
-                ", loc=" + ctx.localNodeId() +
-                ", locInfos=" + locInfos +
-                ", clientInfos=" + clientInfos +
-                ']');
-        }
+    @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
+        Serializable data = getDiscoveryData(dataBag.joiningNodeId());
 
-        if (!nodeId.equals(ctx.localNodeId()) || !locInfos.isEmpty()) {
-            Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos0 = U.newHashMap(clientInfos.size());
+        if (data != null)
+            dataBag.addJoiningNodeData(CONTINUOUS_PROC.ordinal(), data);
+    }
 
-            for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> e : clientInfos.entrySet()) {
-                Map<UUID, LocalRoutineInfo> cp = U.newHashMap(e.getValue().size());
+    /** {@inheritDoc} */
+    @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
+        Serializable data = getDiscoveryData(dataBag.joiningNodeId());
 
-                for (Map.Entry<UUID, LocalRoutineInfo> e0 : e.getValue().entrySet())
-                    cp.put(e0.getKey(), e0.getValue());
+        if (data != null)
+            dataBag.addNodeSpecificData(CONTINUOUS_PROC.ordinal(), data);
+    }
 
-                clientInfos0.put(e.getKey(), cp);
-            }
+    /**
+     * @param joiningNodeId Joining node id.
+     */
+    private Serializable getDiscoveryData(UUID joiningNodeId) {
+        if (log.isDebugEnabled()) {
+            log.debug("collectDiscoveryData [node=" + joiningNodeId +
+                    ", loc=" + ctx.localNodeId() +
+                    ", locInfos=" + locInfos +
+                    ", clientInfos=" + clientInfos +
+                    ']');
+        }
 
-            if (nodeId.equals(ctx.localNodeId()) && ctx.discovery().localNode().isClient()) {
-                Map<UUID, LocalRoutineInfo> infos = new HashMap<>();
+        if (!joiningNodeId.equals(ctx.localNodeId()) || !locInfos.isEmpty()) {
+            Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos0 = copyClientInfos(clientInfos);
 
-                for (Map.Entry<UUID, LocalRoutineInfo> e : locInfos.entrySet())
-                    infos.put(e.getKey(), e.getValue());
+            if (joiningNodeId.equals(ctx.localNodeId()) && ctx.discovery().localNode().isClient()) {
+                Map<UUID, LocalRoutineInfo> infos = copyLocalInfos(locInfos);
 
                 clientInfos0.put(ctx.localNodeId(), infos);
             }
@@ -445,31 +453,75 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 LocalRoutineInfo info = e.getValue();
 
                 data.addItem(new DiscoveryDataItem(routineId,
-                    info.prjPred,
-                    info.hnd,
-                    info.bufSize,
-                    info.interval,
-                    info.autoUnsubscribe));
+                        info.prjPred,
+                        info.hnd,
+                        info.bufSize,
+                        info.interval,
+                        info.autoUnsubscribe));
             }
 
             return data;
         }
-
         return null;
     }
 
-    /** {@inheritDoc} */
-    @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable obj) {
-        DiscoveryData data = (DiscoveryData)obj;
+    /**
+     * @param clientInfos Client infos.
+     */
+    private Map<UUID, Map<UUID, LocalRoutineInfo>> copyClientInfos(Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos) {
+        Map<UUID, Map<UUID, LocalRoutineInfo>> res = U.newHashMap(clientInfos.size());
+
+        for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> e : clientInfos.entrySet()) {
+            Map<UUID, LocalRoutineInfo> cp = U.newHashMap(e.getValue().size());
+
+            for (Map.Entry<UUID, LocalRoutineInfo> e0 : e.getValue().entrySet())
+                cp.put(e0.getKey(), e0.getValue());
+
+            res.put(e.getKey(), cp);
+        }
 
+        return res;
+    }
+
+    /**
+     * @param locInfos Locale infos.
+     */
+    private Map<UUID, LocalRoutineInfo> copyLocalInfos(Map<UUID, LocalRoutineInfo> locInfos) {
+        Map<UUID, LocalRoutineInfo> res = U.newHashMap(locInfos.size());
+
+        for (Map.Entry<UUID, LocalRoutineInfo> e : locInfos.entrySet())
+            res.put(e.getKey(), e.getValue());
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) {
         if (log.isDebugEnabled()) {
-            log.info("onDiscoveryDataReceived [joining=" + joiningNodeId +
-                ", rmtNodeId=" + rmtNodeId +
-                ", loc=" + ctx.localNodeId() +
-                ", data=" + data +
-                ']');
+            log.info("onJoiningNodeDataReceived [joining=" + data.joiningNodeId() +
+                    ", loc=" + ctx.localNodeId() +
+                    ", data=" + data.joiningNodeData() +
+                    ']');
         }
 
+        if (data.hasJoiningNodeData())
+            onDiscoDataReceived((DiscoveryData) data.joiningNodeData());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onGridDataReceived(GridDiscoveryData data) {
+        Map<UUID, Serializable> nodeSpecData = data.nodeSpecificData();
+
+        if (nodeSpecData != null) {
+            for (Map.Entry<UUID, Serializable> e : nodeSpecData.entrySet())
+                onDiscoDataReceived((DiscoveryData) e.getValue());
+        }
+    }
+
+    /**
+     * @param data received discovery data.
+     */
+    private void onDiscoDataReceived(DiscoveryData data) {
         if (!ctx.isDaemon() && data != null) {
             for (DiscoveryDataItem item : data.items) {
                 try {
@@ -478,14 +530,14 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
                     // Register handler only if local node passes projection predicate.
                     if ((item.prjPred == null || item.prjPred.apply(ctx.discovery().localNode())) &&
-                        !locInfos.containsKey(item.routineId))
+                            !locInfos.containsKey(item.routineId))
                         registerHandler(data.nodeId, item.routineId, item.hnd, item.bufSize, item.interval,
-                            item.autoUnsubscribe, false);
+                                item.autoUnsubscribe, false);
 
                     if (!item.autoUnsubscribe)
                         // Register routine locally.
                         locInfos.putIfAbsent(item.routineId, new LocalRoutineInfo(
-                            item.prjPred, item.hnd, item.bufSize, item.interval, item.autoUnsubscribe));
+                                item.prjPred, item.hnd, item.bufSize, item.interval, item.autoUnsubscribe));
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to register continuous handler.", e);
@@ -508,12 +560,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
                             if (info.prjPred == null || info.prjPred.apply(ctx.discovery().localNode())) {
                                 registerHandler(clientNodeId,
-                                    routineId,
-                                    info.hnd,
-                                    info.bufSize,
-                                    info.interval,
-                                    info.autoUnsubscribe,
-                                    false);
+                                        routineId,
+                                        info.hnd,
+                                        info.bufSize,
+                                        info.interval,
+                                        info.autoUnsubscribe,
+                                        false);
                             }
                         }
                         catch (IgniteCheckedException err) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java
new file mode 100644
index 0000000..a361760
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.marshaller;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Future responsible for requesting missing marshaller mapping from one of available server nodes.
+ *
+ * Handles scenarios when server nodes leave cluster. If node that was requested for mapping leaves the cluster or fails,
+ * mapping is automatically requested from the next node available in topology.
+ */
+final class ClientRequestFuture extends GridFutureAdapter<MappingExchangeResult> {
+    /** */
+    private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+    /** */
+    private static IgniteLogger log;
+
+    /** */
+    private final GridIoManager ioMgr;
+
+    /** */
+    private final GridDiscoveryManager discoMgr;
+
+    /** */
+    private final MarshallerMappingItem item;
+
+    /** */
+    private final Map<MarshallerMappingItem, ClientRequestFuture> syncMap;
+
+    /** */
+    private final Queue<ClusterNode> aliveSrvNodes;
+
+    /** */
+    private ClusterNode pendingNode;
+
+    /**
+     * @param ctx Context.
+     * @param item Item.
+     * @param syncMap Sync map.
+     */
+    ClientRequestFuture(
+            GridKernalContext ctx,
+            MarshallerMappingItem item,
+            Map<MarshallerMappingItem, ClientRequestFuture> syncMap
+    ) {
+        ioMgr = ctx.io();
+        discoMgr = ctx.discovery();
+        aliveSrvNodes = new LinkedList<>(discoMgr.aliveSrvNodes());
+        this.item = item;
+        this.syncMap = syncMap;
+
+        if (log == null)
+            log = U.logger(ctx, logRef, ClientRequestFuture.class);
+    }
+
+    /**
+     *
+     */
+    void requestMapping() {
+        boolean noSrvsInCluster;
+
+        synchronized (this) {
+            while (!aliveSrvNodes.isEmpty()) {
+                ClusterNode srvNode = aliveSrvNodes.poll();
+
+                try {
+                    ioMgr.send(
+                            srvNode,
+                            GridTopic.TOPIC_MAPPING_MARSH,
+                            new MissingMappingRequestMessage(
+                                    item.platformId(),
+                                    item.typeId()),
+                            GridIoPolicy.SYSTEM_POOL);
+
+                    if (discoMgr.node(srvNode.id()) == null)
+                        continue;
+
+                    pendingNode = srvNode;
+
+                    break;
+                }
+                catch (IgniteCheckedException ignored) {
+                    U.warn(log,
+                            "Failed to request marshaller mapping from remote node (proceeding with the next one): "
+                                    + srvNode);
+                }
+            }
+
+            noSrvsInCluster = pendingNode == null;
+        }
+
+        if (noSrvsInCluster)
+            onDone(MappingExchangeResult.createFailureResult(
+                    new IgniteCheckedException(
+                            "All server nodes have left grid, cannot request mapping [platformId: "
+                                    + item.platformId()
+                                    + "; typeId: "
+                                    + item.typeId() + "]")));
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param res Mapping Request Result.
+     */
+    void onResponse(UUID nodeId, MappingExchangeResult res) {
+        MappingExchangeResult res0 = null;
+
+        synchronized (this) {
+            if (pendingNode != null && pendingNode.id().equals(nodeId))
+                res0 = res;
+        }
+
+        if (res0 != null)
+            onDone(res0);
+    }
+
+    /**
+     * If left node is actually the one latest mapping request was sent to,
+     * request is sent again to the next node in topology.
+     *
+     * @param leftNodeId Left node id.
+     */
+    void onNodeLeft(UUID leftNodeId) {
+        boolean reqAgain = false;
+
+        synchronized (this) {
+            if (pendingNode != null && pendingNode.id().equals(leftNodeId)) {
+                aliveSrvNodes.remove(pendingNode);
+
+                pendingNode = null;
+
+                reqAgain = true;
+            }
+        }
+
+        if (reqAgain)
+            requestMapping();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onDone(@Nullable MappingExchangeResult res, @Nullable Throwable err) {
+        assert res != null;
+
+        boolean done = super.onDone(res, err);
+
+        if (done)
+            syncMap.remove(item);
+
+        return done;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
new file mode 100644
index 0000000..7356e6c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.marshaller;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
+import org.apache.ignite.internal.MarshallerContextImpl;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.discovery.CustomEventListener;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
+import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentHashMap8;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.MARSHALLER_PROC;
+import static org.apache.ignite.internal.GridTopic.TOPIC_MAPPING_MARSH;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+
+/**
+ * Processor responsible for managing custom {@link DiscoveryCustomMessage}
+ * events for exchanging marshalling mappings between nodes in grid.
+ *
+ * In particular it processes two flows:
+ * <ul>
+ *     <li>
+ *         Some node, server or client, wants to add new mapping for some class.
+ *         In that case a pair of {@link MappingProposedMessage} and {@link MappingAcceptedMessage} events is used.
+ *     </li>
+ *     <li>
+ *         As discovery events are delivered to clients asynchronously,
+ *         client node may not have some mapping when server nodes in the grid are already allowed to use the mapping.
+ *         In that situation client sends a {@link MissingMappingRequestMessage} request
+ *         and processor handles it as well as {@link MissingMappingResponseMessage} message.
+ *     </li>
+ * </ul>
+ */
+public class GridMarshallerMappingProcessor extends GridProcessorAdapter {
+    /** */
+    private final MarshallerContextImpl marshallerCtx;
+
+    /** */
+    private final ConcurrentMap<MarshallerMappingItem, GridFutureAdapter<MappingExchangeResult>> mappingExchangeSyncMap
+            = new ConcurrentHashMap8<>();
+
+    /** */
+    private final ConcurrentMap<MarshallerMappingItem, ClientRequestFuture> clientReqSyncMap = new ConcurrentHashMap8<>();
+
+    /**
+     * @param ctx Kernal context.
+     */
+    public GridMarshallerMappingProcessor(GridKernalContext ctx) {
+        super(ctx);
+
+        marshallerCtx = ctx.marshallerContext();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteCheckedException {
+        GridDiscoveryManager discoMgr = ctx.discovery();
+        GridIoManager ioMgr = ctx.io();
+
+        MarshallerMappingTransport transport = new MarshallerMappingTransport(
+                ctx,
+                mappingExchangeSyncMap,
+                clientReqSyncMap
+        );
+        marshallerCtx.onMarshallerProcessorStarted(ctx, transport);
+
+        discoMgr.setCustomEventListener(MappingProposedMessage.class, new MarshallerMappingExchangeListener());
+
+        discoMgr.setCustomEventListener(MappingAcceptedMessage.class, new MappingAcceptedListener());
+
+        if (!ctx.clientNode())
+            ioMgr.addMessageListener(TOPIC_MAPPING_MARSH, new MissingMappingRequestListener(ioMgr));
+        else
+            ioMgr.addMessageListener(TOPIC_MAPPING_MARSH, new MissingMappingResponseListener());
+
+        if (ctx.clientNode())
+            ctx.event().addLocalEventListener(new GridLocalEventListener() {
+                @Override public void onEvent(Event evt) {
+                    DiscoveryEvent evt0 = (DiscoveryEvent) evt;
+
+                    if (!ctx.isStopping()) {
+                        for (ClientRequestFuture fut : clientReqSyncMap.values())
+                            fut.onNodeLeft(evt0.eventNode().id());
+                    }
+                }
+            }, EVT_NODE_LEFT, EVT_NODE_FAILED);
+    }
+
+    /**
+     *
+     */
+    private final class MissingMappingRequestListener implements GridMessageListener {
+        /** */
+        private final GridIoManager ioMgr;
+
+        /**
+         * @param ioMgr Io manager.
+         */
+        MissingMappingRequestListener(GridIoManager ioMgr) {
+            this.ioMgr = ioMgr;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMessage(UUID nodeId, Object msg) {
+            assert msg instanceof MissingMappingRequestMessage : msg;
+
+            MissingMappingRequestMessage msg0 = (MissingMappingRequestMessage) msg;
+
+            byte platformId = msg0.platformId();
+            int typeId = msg0.typeId();
+
+            String resolvedClsName = marshallerCtx.resolveMissedMapping(platformId, typeId);
+
+            try {
+                ioMgr.send(
+                        nodeId,
+                        TOPIC_MAPPING_MARSH,
+                        new MissingMappingResponseMessage(platformId, typeId, resolvedClsName),
+                        SYSTEM_POOL);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to send missing mapping response.", e);
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private final class MissingMappingResponseListener implements GridMessageListener {
+        /** {@inheritDoc} */
+        @Override public void onMessage(UUID nodeId, Object msg) {
+            assert msg instanceof MissingMappingResponseMessage : msg;
+
+            MissingMappingResponseMessage msg0 = (MissingMappingResponseMessage) msg;
+
+            byte platformId = msg0.platformId();
+            int typeId = msg0.typeId();
+            String resolvedClsName = msg0.className();
+
+            MarshallerMappingItem item = new MarshallerMappingItem(platformId, typeId, null);
+
+            GridFutureAdapter<MappingExchangeResult> fut = clientReqSyncMap.get(item);
+
+            if (fut != null) {
+                if (resolvedClsName != null) {
+                    marshallerCtx.onMissedMappingResolved(item, resolvedClsName);
+
+                    fut.onDone(MappingExchangeResult.createSuccessfulResult(resolvedClsName));
+                }
+                else
+                    fut.onDone(MappingExchangeResult.createFailureResult(
+                            new IgniteCheckedException(
+                                    "Failed to resolve mapping [platformId: "
+                                            + platformId
+                                            + ", typeId: "
+                                            + typeId + "]")));
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private final class MarshallerMappingExchangeListener implements CustomEventListener<MappingProposedMessage> {
+        /** {@inheritDoc} */
+        @Override public void onCustomEvent(
+                AffinityTopologyVersion topVer,
+                ClusterNode snd,
+                MappingProposedMessage msg
+        ) {
+            if (!ctx.isStopping()) {
+                if (msg.duplicated())
+                    return;
+
+                if (!msg.inConflict()) {
+                    MarshallerMappingItem item = msg.mappingItem();
+                    String conflictingName = marshallerCtx.onMappingProposed(item);
+
+                    if (conflictingName != null) {
+                        if (conflictingName.equals(item.className()))
+                            msg.markDuplicated();
+                        else
+                            msg.conflictingWithClass(conflictingName);
+                    }
+                }
+                else {
+                    UUID origNodeId = msg.origNodeId();
+
+                    if (origNodeId.equals(ctx.localNodeId())) {
+                        GridFutureAdapter<MappingExchangeResult> fut = mappingExchangeSyncMap.get(msg.mappingItem());
+
+                        assert fut != null: msg;
+
+                        fut.onDone(MappingExchangeResult.createFailureResult(
+                                duplicateMappingException(msg.mappingItem(), msg.conflictingClassName())));
+                    }
+                }
+            }
+        }
+
+        /**
+         * @param mappingItem Mapping item.
+         * @param conflictingClsName Conflicting class name.
+         */
+        private IgniteCheckedException duplicateMappingException(
+                MarshallerMappingItem mappingItem,
+                String conflictingClsName
+        ) {
+            return new IgniteCheckedException("Duplicate ID [platformId="
+                    + mappingItem.platformId()
+                    + ", typeId="
+                    + mappingItem.typeId()
+                    + ", oldCls="
+                    + conflictingClsName
+                    + ", newCls="
+                    + mappingItem.className() + "]");
+        }
+    }
+
+    /**
+     *
+     */
+    private final class MappingAcceptedListener implements CustomEventListener<MappingAcceptedMessage> {
+        /** {@inheritDoc} */
+        @Override public void onCustomEvent(
+                AffinityTopologyVersion topVer,
+                ClusterNode snd,
+                MappingAcceptedMessage msg
+        ) {
+            MarshallerMappingItem item = msg.getMappingItem();
+            marshallerCtx.onMappingAccepted(item);
+
+            GridFutureAdapter<MappingExchangeResult> fut = mappingExchangeSyncMap.get(item);
+
+            if (fut != null)
+                fut.onDone(MappingExchangeResult.createSuccessfulResult(item.className()));
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
+        if (!dataBag.commonDataCollectedFor(MARSHALLER_PROC.ordinal()))
+            dataBag.addGridCommonData(MARSHALLER_PROC.ordinal(), marshallerCtx.getCachedMappings());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onGridDataReceived(GridDiscoveryData data) {
+        List<Map<Integer, MappedName>> mappings = (List<Map<Integer, MappedName>>) data.commonData();
+
+        if (mappings != null) {
+            for (int i = 0; i < mappings.size(); i++) {
+                Map<Integer, MappedName> map;
+
+                if ((map = mappings.get(i)) != null)
+                    marshallerCtx.onMappingDataReceived((byte) i, map);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
+        cancelFutures(MappingExchangeResult.createFailureResult(new IgniteClientDisconnectedCheckedException(
+                ctx.cluster().clientReconnectFuture(),
+                "Failed to propose or request mapping, client node disconnected.")));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onKernalStop(boolean cancel) {
+        marshallerCtx.onMarshallerProcessorStop();
+
+        cancelFutures(MappingExchangeResult.createExchangeDisabledResult());
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
+        return MARSHALLER_PROC;
+    }
+
+    /**
+     * @param res Response.
+     */
+    private void cancelFutures(MappingExchangeResult res) {
+        for (GridFutureAdapter<MappingExchangeResult> fut : mappingExchangeSyncMap.values())
+            fut.onDone(res);
+
+        for (GridFutureAdapter<MappingExchangeResult> fut : clientReqSyncMap.values())
+            fut.onDone(res);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappedName.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappedName.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappedName.java
new file mode 100644
index 0000000..c13c48e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappedName.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.marshaller;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Contains mapped class name and boolean flag showing whether this mapping was accepted by other nodes or not.
+ */
+public final class MappedName implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final String clsName;
+
+    /** */
+    private final boolean accepted;
+
+    /**
+     * @param clsName Class name.
+     * @param accepted Accepted.
+     */
+    public MappedName(String clsName, boolean accepted) {
+        this.clsName = clsName;
+        this.accepted = accepted;
+    }
+
+    /**
+     *
+     */
+    public String className() {
+        return clsName;
+    }
+
+    /**
+     *
+     */
+    public boolean accepted() {
+        return accepted;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MappedName.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java
new file mode 100644
index 0000000..23c2858
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.marshaller;
+
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Is sent as an acknowledgement for successfully proposed new mapping (see {@link MappingProposedMessage}).
+ *
+ * If any nodes were waiting for this mapping to be accepted they will be unblocked on receiving this message.
+ */
+public class MappingAcceptedMessage implements DiscoveryCustomMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final IgniteUuid id = IgniteUuid.randomUuid();
+
+    /** */
+    private final MarshallerMappingItem item;
+
+    /**
+     * @param item Item.
+     */
+    MappingAcceptedMessage(MarshallerMappingItem item) {
+        this.item = item;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid id() {
+        return id;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        return false;
+    }
+
+    /** */
+    MarshallerMappingItem getMappingItem() {
+        return item;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MappingAcceptedMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingExchangeResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingExchangeResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingExchangeResult.java
new file mode 100644
index 0000000..4bc1442
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingExchangeResult.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.marshaller;
+
+import org.apache.ignite.IgniteCheckedException;
+
+/**
+ *
+ */
+public class MappingExchangeResult {
+    /**  */
+    private final String acceptedClsName;
+
+    /** */
+    private final IgniteCheckedException error;
+
+    /** */
+    private final ResultType resType;
+
+    /** */
+    private enum ResultType {
+        /** */
+        SUCCESS,
+
+        /** */
+        FAILURE,
+
+        /** */
+        EXCHANGE_DISABLED
+    }
+
+    /**
+     */
+    private MappingExchangeResult(ResultType resType, String acceptedClsName, IgniteCheckedException error) {
+        this.resType = resType;
+        this.acceptedClsName = acceptedClsName;
+        this.error = error;
+    }
+
+    /**  */
+    public String className() {
+        return acceptedClsName;
+    }
+
+    /**  */
+    public IgniteCheckedException error() {
+        return error;
+    }
+
+    /** */
+    public boolean successful() {
+        return resType == ResultType.SUCCESS;
+    }
+
+    /** */
+    public boolean exchangeDisabled() {
+        return resType == ResultType.EXCHANGE_DISABLED;
+    }
+
+    /**
+     * @param acceptedClsName Accepted class name.
+     */
+    static MappingExchangeResult createSuccessfulResult(String acceptedClsName) {
+        assert acceptedClsName != null;
+
+        return new MappingExchangeResult(ResultType.SUCCESS, acceptedClsName, null);
+    }
+
+    /**
+     * @param error Error.
+     */
+    static MappingExchangeResult createFailureResult(IgniteCheckedException error) {
+        assert error != null;
+
+        return new MappingExchangeResult(ResultType.FAILURE, null, error);
+    }
+
+    /** */
+    static MappingExchangeResult createExchangeDisabledResult() {
+        return new MappingExchangeResult(ResultType.EXCHANGE_DISABLED, null, null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java
new file mode 100644
index 0000000..33a2168
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.marshaller;
+
+import java.util.UUID;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Node sends this message when it wants to propose new marshaller mapping and to ensure that there are no conflicts
+ * with this mapping on other nodes in cluster.
+ *
+ * After sending this message to the cluster sending node gets blocked until mapping is either accepted or rejected.
+ *
+ * When it completes a pass around the cluster ring with no conflicts observed,
+ * {@link MappingAcceptedMessage} is sent as an acknowledgement that everything is fine.
+ */
+public class MappingProposedMessage implements DiscoveryCustomMessage {
+    /** */
+    private enum ProposalStatus {
+        /** */
+        SUCCESSFUL,
+        /** */
+        IN_CONFLICT,
+        /** */
+        DUPLICATED
+    }
+
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final IgniteUuid id = IgniteUuid.randomUuid();
+
+    /** */
+    private final UUID origNodeId;
+
+    /** */
+    @GridToStringInclude
+    private final MarshallerMappingItem mappingItem;
+
+    /** */
+    private ProposalStatus status = ProposalStatus.SUCCESSFUL;
+
+    /** */
+    private String conflictingClsName;
+
+    /**
+     * @param mappingItem Mapping item.
+     * @param origNodeId Orig node id.
+     */
+    MappingProposedMessage(MarshallerMappingItem mappingItem, UUID origNodeId) {
+        assert origNodeId != null;
+
+        this.mappingItem = mappingItem;
+        this.origNodeId = origNodeId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid id() {
+        return id;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+        if (status == ProposalStatus.SUCCESSFUL)
+            return new MappingAcceptedMessage(mappingItem);
+        else
+            return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        return true;
+    }
+
+    /** */
+    MarshallerMappingItem mappingItem() {
+        return mappingItem;
+    }
+
+    /** */
+    UUID origNodeId() {
+        return origNodeId;
+    }
+
+    /** */
+    boolean inConflict() {
+        return status == ProposalStatus.IN_CONFLICT;
+    }
+
+    /** */
+    public boolean duplicated() {
+        return status == ProposalStatus.DUPLICATED;
+    }
+
+    /** */
+    void conflictingWithClass(String conflClsName) {
+        status = ProposalStatus.IN_CONFLICT;
+        conflictingClsName = conflClsName;
+    }
+
+    /** */
+    void markDuplicated() {
+        status = ProposalStatus.DUPLICATED;
+    }
+
+    /** */
+    String conflictingClassName() {
+        return conflictingClsName;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MappingProposedMessage.class, this);
+    }
+}


Mime
View raw message