ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [28/31] incubator-ignite git commit: ignite-471-2: huge merge from sprint-6
Date Wed, 10 Jun 2015 16:27:51 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index 505204d..f33fa39 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -20,9 +20,9 @@ package org.apache.ignite.internal;
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.events.*;
+import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.interop.*;
 import org.apache.ignite.internal.managers.deployment.*;
-import org.apache.ignite.internal.managers.discovery.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.continuous.*;
@@ -131,41 +131,91 @@ class GridEventConsumeHandler implements GridContinuousHandler {
         final boolean loc = nodeId.equals(ctx.localNodeId());
 
         lsnr = new GridLocalEventListener() {
+            /** node ID, routine ID, event */
+            private final Queue<T3<UUID, UUID, Event>> notificationQueue = new LinkedList<>();
+
+            private boolean notificationInProgress;
+
             @Override public void onEvent(Event evt) {
-                if (filter == null || filter.apply(evt)) {
-                    if (loc) {
-                        if (!cb.apply(nodeId, evt))
-                            ctx.continuous().stopRoutine(routineId);
-                    }
-                    else {
-                        GridDiscoveryManager disco = ctx.discovery();
+                if (filter != null && !filter.apply(evt))
+                    return;
+
+                if (loc) {
+                    if (!cb.apply(nodeId, evt))
+                        ctx.continuous().stopRoutine(routineId);
+                }
+                else {
+                    if (ctx.discovery().node(nodeId) == null)
+                        return;
+
+                    synchronized (notificationQueue) {
+                        notificationQueue.add(new T3<>(nodeId, routineId, evt));
+
+                        if (!notificationInProgress) {
+                            ctx.getSystemExecutorService().submit(new Runnable() {
+                                @Override public void run() {
+                                    if (!ctx.continuous().lockStopping())
+                                        return;
 
-                        ClusterNode node = disco.node(nodeId);
+                                    try {
+                                        while (true) {
+                                            T3<UUID, UUID, Event> t3;
 
-                        if (node != null) {
-                            try {
-                                EventWrapper wrapper = new EventWrapper(evt);
+                                            synchronized (notificationQueue) {
+                                                t3 = notificationQueue.poll();
 
-                                if (evt instanceof CacheEvent) {
-                                    String cacheName = ((CacheEvent)evt).cacheName();
+                                                if (t3 == null) {
+                                                    notificationInProgress = false;
 
-                                    if (ctx.config().isPeerClassLoadingEnabled() && disco.cacheNode(node, cacheName)) {
-                                        wrapper.p2pMarshal(ctx.config().getMarshaller());
+                                                    return;
+                                                }
+                                            }
 
-                                        wrapper.cacheName = cacheName;
+                                            try {
+                                                Event evt = t3.get3();
 
-                                        GridCacheDeploymentManager depMgr =
-                                            ctx.cache().internalCache(cacheName).context().deploy();
+                                                EventWrapper wrapper = new EventWrapper(evt);
 
-                                        depMgr.prepare(wrapper);
+                                                if (evt instanceof CacheEvent) {
+                                                    String cacheName = ((CacheEvent)evt).cacheName();
+
+                                                    ClusterNode node = ctx.discovery().node(t3.get1());
+
+                                                    if (node == null)
+                                                        continue;
+
+                                                    if (ctx.config().isPeerClassLoadingEnabled()
+                                                        && ctx.discovery().cacheNode(node, cacheName)) {
+                                                        wrapper.p2pMarshal(ctx.config().getMarshaller());
+
+                                                        wrapper.cacheName = cacheName;
+
+                                                        GridCacheDeploymentManager depMgr = ctx.cache()
+                                                            .internalCache(cacheName).context().deploy();
+
+                                                        depMgr.prepare(wrapper);
+                                                    }
+                                                }
+
+                                                ctx.continuous().addNotification(t3.get1(), t3.get2(), wrapper, null, false,
+                                                    false);
+                                            }
+                                            catch (ClusterTopologyCheckedException ignored) {
+                                                // No-op.
+                                            }
+                                            catch (Throwable e) {
+                                                U.error(ctx.log(GridEventConsumeHandler.class),
+                                                    "Failed to send event notification to node: " + nodeId, e);
+                                            }
+                                        }
+                                    }
+                                    finally {
+                                        ctx.continuous().unlockStopping();
                                     }
                                 }
+                            });
 
-                                ctx.continuous().addNotification(nodeId, routineId, wrapper, null, false, false);
-                            }
-                            catch (IgniteCheckedException e) {
-                                U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, e);
-                            }
+                            notificationInProgress = true;
                         }
                     }
                 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index ad7d562..d6542f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -552,4 +552,9 @@ public interface GridKernalContext extends Iterable<GridComponent> {
      * @return Marshaller context.
      */
     public MarshallerContextImpl marshallerContext();
+
+    /**
+     * @return {@code True} if local node is client node (has flag {@link IgniteConfiguration#isClientMode()} set).
+     */
+    public boolean clientNode();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 1ff483e..f921d49 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -894,6 +894,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
     }
 
     /** {@inheritDoc} */
+    @Override public boolean clientNode() {
+        return cfg.isClientMode();
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridKernalContextImpl.class, this);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index c4b93b8..4f5e365 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.datastructures.*;
 import org.apache.ignite.internal.processors.hadoop.*;
 import org.apache.ignite.internal.processors.job.*;
 import org.apache.ignite.internal.processors.jobmetrics.*;
+import org.apache.ignite.internal.processors.nodevalidation.*;
 import org.apache.ignite.internal.processors.offheap.*;
 import org.apache.ignite.internal.processors.plugin.*;
 import org.apache.ignite.internal.processors.port.*;
@@ -56,7 +57,6 @@ import org.apache.ignite.internal.processors.security.*;
 import org.apache.ignite.internal.processors.segmentation.*;
 import org.apache.ignite.internal.processors.service.*;
 import org.apache.ignite.internal.processors.session.*;
-import org.apache.ignite.internal.processors.nodevalidation.*;
 import org.apache.ignite.internal.processors.task.*;
 import org.apache.ignite.internal.processors.timeout.*;
 import org.apache.ignite.internal.util.*;
@@ -169,11 +169,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
 
     /** */
     @GridToStringExclude
-    private Timer starveTimer;
+    private GridTimeoutProcessor.CancelableTask starveTask;
 
     /** */
     @GridToStringExclude
-    private Timer metricsLogTimer;
+    private GridTimeoutProcessor.CancelableTask metricsLogTask;
 
     /** Indicate error on grid stop. */
     @GridToStringExclude
@@ -867,13 +867,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         if (starveCheck) {
             final long interval = F.isEmpty(intervalStr) ? PERIODIC_STARVATION_CHECK_FREQ : Long.parseLong(intervalStr);
 
-            starveTimer = new Timer("ignite-starvation-checker");
-
-            starveTimer.scheduleAtFixedRate(new GridTimerTask() {
+            starveTask = ctx.timeout().schedule(new Runnable() {
                 /** Last completed task count. */
                 private long lastCompletedCnt;
 
-                @Override protected void safeRun() {
+                @Override public void run() {
                     if (!(execSvc instanceof ThreadPoolExecutor))
                         return;
 
@@ -896,13 +894,10 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         long metricsLogFreq = cfg.getMetricsLogFrequency();
 
         if (metricsLogFreq > 0) {
-            metricsLogTimer = new Timer("ignite-metrics-logger");
-
-            metricsLogTimer.scheduleAtFixedRate(new GridTimerTask() {
-                /** */
+            metricsLogTask = ctx.timeout().schedule(new Runnable() {
                 private final DecimalFormat dblFmt = new DecimalFormat("#.##");
 
-                @Override protected void safeRun() {
+                @Override public void run() {
                     if (log.isInfoEnabled()) {
                         ClusterMetrics m = cluster().localNode().metrics();
 
@@ -963,8 +958,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
                             sysPoolQSize = exec.getQueue().size();
                         }
 
+                        String id = U.id8(localNode().id());
+
                         String msg = NL +
                             "Metrics for local node (to disable set 'metricsLogFrequency' to 0)" + NL +
+                            "    ^-- Node [id=" + id + ", name=" + name() + "]" + NL +
                             "    ^-- H/N/C [hosts=" + hosts + ", nodes=" + nodes + ", CPUs=" + cpus + "]" + NL +
                             "    ^-- CPU [cur=" + dblFmt.format(cpuLoadPct) + "%, avg=" +
                                 dblFmt.format(avgCpuLoadPct) + "%, GC=" + dblFmt.format(gcPct) + "%]" + NL +
@@ -1165,6 +1163,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
 
         add(ATTR_CLIENT_MODE, cfg.isClientMode());
 
+        add(ATTR_CONSISTENCY_CHECK_SKIPPED, getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK));
+
         // Build a string from JVM arguments, because parameters with spaces are split.
         SB jvmArgs = new SB(512);
 
@@ -1550,7 +1550,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
                     ">>> Grid name: " + gridName + NL +
                     ">>> Local node [" +
                     "ID=" + locNode.id().toString().toUpperCase() +
-                    ", order=" + locNode.order() +
+                    ", order=" + locNode.order() + ", clientMode=" + ctx.clientNode() +
                     "]" + NL +
                     ">>> Local node addresses: " + U.addressesAsString(locNode) + NL +
                     ">>> Local ports: " + sb + NL;
@@ -1713,12 +1713,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
             if (updateNtfTimer != null)
                 updateNtfTimer.cancel();
 
-            if (starveTimer != null)
-                starveTimer.cancel();
+            if (starveTask != null)
+                starveTask.close();
 
-            // Cancel metrics log timer.
-            if (metricsLogTimer != null)
-                metricsLogTimer.cancel();
+            if (metricsLogTask != null)
+                metricsLogTask.close();
 
             boolean interrupted = false;
 
@@ -2370,7 +2369,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         try {
             ctx.cache().dynamicStartCache(null, cacheName, nearCfg, true).get();
 
-            return ctx.cache().publicJCache(cacheName);
+            IgniteCacheProxy<K, V> cache = ctx.cache().publicJCache(cacheName);
+
+            checkNearCacheStarted(cache);
+
+            return cache;
         }
         catch (IgniteCheckedException e) {
             throw CU.convertToCacheException(e);
@@ -2397,7 +2400,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
                     ctx.cache().dynamicStartCache(null, cacheName, nearCfg, false).get();
             }
 
-            return ctx.cache().publicJCache(cacheName);
+            IgniteCacheProxy<K, V> cache = ctx.cache().publicJCache(cacheName);
+
+            checkNearCacheStarted(cache);
+
+            return cache;
         }
         catch (IgniteCheckedException e) {
             throw CU.convertToCacheException(e);
@@ -2407,6 +2414,15 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         }
     }
 
+    /**
+     * @param cache Cache.
+     */
+    private void checkNearCacheStarted(IgniteCacheProxy<?, ?> cache) {
+        if (!cache.context().isNear())
+            throw new IgniteException("Failed to start near cache " +
+                "(a cache with the same name without near cache is already started)");
+    }
+
     /** {@inheritDoc} */
     @Override public void destroyCache(String cacheName) {
         guard();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
index 98cc3a7..928db5e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
@@ -126,9 +126,12 @@ public final class IgniteNodeAttributes {
     /** Security subject for authenticated node. */
     public static final String ATTR_SECURITY_SUBJECT = ATTR_PREFIX + ".security.subject";
 
-    /** Cache interceptors. */
+    /** Client mode flag. */
     public static final String ATTR_CLIENT_MODE = ATTR_PREFIX + ".cache.client";
 
+    /** Configuration consistency check disabled flag. */
+    public static final String ATTR_CONSISTENCY_CHECK_SKIPPED = ATTR_PREFIX + ".consistency.check.skipped";
+
     /**
      * Enforces singleton.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index d54e06f..5cbe377 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -121,11 +121,7 @@ public class IgnitionEx {
     };
 
     /** */
-    private static ThreadLocal<Boolean> clientMode = new ThreadLocal<Boolean>() {
-        @Override protected Boolean initialValue() {
-            return null;
-        }
-    };
+    private static ThreadLocal<Boolean> clientMode = new ThreadLocal<>();
 
     /**
      * Checks runtime version to be 1.7.x or 1.8.x.
@@ -196,7 +192,7 @@ public class IgnitionEx {
      * @return Client mode flag.
      */
     public static boolean isClientMode() {
-        return clientMode.get();
+        return clientMode.get() == null ? false : clientMode.get();
     }
 
     /**
@@ -1458,8 +1454,9 @@ public class IgnitionEx {
                 DFLT_PUBLIC_KEEP_ALIVE_TIME,
                 new LinkedBlockingQueue<Runnable>(DFLT_PUBLIC_THREADPOOL_QUEUE_CAP));
 
-            // Pre-start all threads as they are guaranteed to be needed.
-            ((ThreadPoolExecutor) execSvc).prestartAllCoreThreads();
+            if (!myCfg.isClientMode())
+                // Pre-start all threads as they are guaranteed to be needed.
+                ((ThreadPoolExecutor)execSvc).prestartAllCoreThreads();
 
             // Note that since we use 'LinkedBlockingQueue', number of
             // maximum threads has no effect.
@@ -1471,7 +1468,7 @@ public class IgnitionEx {
                 new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP));
 
             // Pre-start all threads as they are guaranteed to be needed.
-            ((ThreadPoolExecutor) sysExecSvc).prestartAllCoreThreads();
+            ((ThreadPoolExecutor)sysExecSvc).prestartAllCoreThreads();
 
             // Note that since we use 'LinkedBlockingQueue', number of
             // maximum threads has no effect.
@@ -1764,20 +1761,14 @@ public class IgnitionEx {
         public void initializeDefaultCacheConfiguration(IgniteConfiguration cfg) throws IgniteCheckedException {
             List<CacheConfiguration> cacheCfgs = new ArrayList<>();
 
-            boolean clientDisco = cfg.getDiscoverySpi() instanceof TcpClientDiscoverySpi;
-
-            // Add marshaller and utility caches.
-            if (!clientDisco) {
-                cacheCfgs.add(marshallerSystemCache());
+            cacheCfgs.add(marshallerSystemCache());
 
-                cacheCfgs.add(utilitySystemCache());
-            }
+            cacheCfgs.add(utilitySystemCache());
 
             if (IgniteComponentType.HADOOP.inClassPath())
                 cacheCfgs.add(CU.hadoopSystemCache());
 
-            if (cfg.getAtomicConfiguration() != null && !clientDisco)
-                cacheCfgs.add(atomicsSystemCache(cfg.getAtomicConfiguration()));
+            cacheCfgs.add(atomicsSystemCache(cfg.getAtomicConfiguration()));
 
             CacheConfiguration[] userCaches = cfg.getCacheConfiguration();
 
@@ -1854,7 +1845,7 @@ public class IgnitionEx {
             if (cfg.getSwapSpaceSpi() == null) {
                 boolean needSwap = false;
 
-                if (cfg.getCacheConfiguration() != null) {
+                if (cfg.getCacheConfiguration() != null && !Boolean.TRUE.equals(cfg.isClientMode())) {
                     for (CacheConfiguration c : cfg.getCacheConfiguration()) {
                         if (c.isSwapEnabled()) {
                             needSwap = true;
@@ -2005,7 +1996,6 @@ public class IgnitionEx {
             ccfg.setWriteSynchronizationMode(FULL_SYNC);
             ccfg.setCacheMode(cfg.getCacheMode());
             ccfg.setNodeFilter(CacheConfiguration.ALL_NODES);
-            ccfg.setNearConfiguration(new NearCacheConfiguration());
 
             if (cfg.getCacheMode() == PARTITIONED)
                 ccfg.setBackups(cfg.getBackups());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextAdapter.java
index e7ee2ff..2ec9825 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextAdapter.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal;
 
 import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.marshaller.*;
 import org.jsr166.*;
@@ -52,10 +53,29 @@ public abstract class MarshallerContextAdapter implements MarshallerContext {
 
             Enumeration<URL> urls = ldr.getResources(CLS_NAMES_FILE);
 
-            while (urls.hasMoreElements())
+            boolean foundClsNames = false;
+
+            while (urls.hasMoreElements()) {
                 processResource(urls.nextElement());
 
-            processResource(ldr.getResource(JDK_CLS_NAMES_FILE));
+                foundClsNames = true;
+            }
+
+            if (!foundClsNames)
+                throw new IgniteException("Failed to load class names properties file packaged with ignite binaries " +
+                    "[file=" + CLS_NAMES_FILE + ", ldr=" + ldr + ']');
+
+            URL jdkClsNames = ldr.getResource(JDK_CLS_NAMES_FILE);
+
+            if (jdkClsNames == null)
+                throw new IgniteException("Failed to load class names properties file packaged with ignite binaries " +
+                    "[file=" + JDK_CLS_NAMES_FILE + ", ldr=" + ldr + ']');
+
+            processResource(jdkClsNames);
+
+            checkHasClassName(GridDhtPartitionFullMap.class.getName(), ldr, CLS_NAMES_FILE);
+            checkHasClassName(GridDhtPartitionMap.class.getName(), ldr, CLS_NAMES_FILE);
+            checkHasClassName(HashMap.class.getName(), ldr, JDK_CLS_NAMES_FILE);
         }
         catch (IOException e) {
             throw new IllegalStateException("Failed to initialize marshaller context.", e);
@@ -63,6 +83,18 @@ public abstract class MarshallerContextAdapter implements MarshallerContext {
     }
 
     /**
+     * @param clsName Class name.
+     * @param ldr Class loader used to get properties file.
+     * @param fileName File name.
+     */
+    private void checkHasClassName(String clsName, ClassLoader ldr, String fileName) {
+        if (!map.containsKey(clsName.hashCode()))
+            throw new IgniteException("Failed to read class name from class names properties file. " +
+                "Make sure class names properties file packaged with ignite binaries is not corrupted " +
+                "[clsName=" + clsName + ", fileName=" + fileName + ", ldr=" + ldr + ']');
+    }
+
+    /**
      * @param url Resource URL.
      * @throws IOException In case of error.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
index f964bec..4b0251d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
@@ -59,7 +59,7 @@ public class MarshallerContextImpl extends MarshallerContextAdapter {
         ctx.cache().marshallerCache().context().continuousQueries().executeInternalQuery(
             new ContinuousQueryListener(log, workDir),
             null,
-            true,
+            ctx.cache().marshallerCache().context().affinityNode(),
             true
         );
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/events/DiscoveryCustomEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/events/DiscoveryCustomEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/events/DiscoveryCustomEvent.java
index ee32692..779b54d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/events/DiscoveryCustomEvent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/events/DiscoveryCustomEvent.java
@@ -22,19 +22,17 @@ import org.apache.ignite.internal.managers.discovery.*;
 import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 
-import java.io.*;
-
 /**
  * Custom event.
  */
 public class DiscoveryCustomEvent extends DiscoveryEvent {
     /** */
     private static final long serialVersionUID = 0L;
-    
+
     /**
      * Built-in event type: custom event sent.
      * <br>
-     * Generated when someone invoke {@link GridDiscoveryManager#sendCustomEvent(Serializable)}.
+     * Generated when someone invoke {@link GridDiscoveryManager#sendCustomEvent(DiscoveryCustomMessage)}.
      * <p>
      *
      * @see DiscoveryCustomEvent
@@ -42,7 +40,7 @@ public class DiscoveryCustomEvent extends DiscoveryEvent {
     public static final int EVT_DISCOVERY_CUSTOM_EVT = 18;
 
     /** */
-    private Serializable data;
+    private DiscoveryCustomMessage customMsg;
 
     /** Affinity topology version. */
     private AffinityTopologyVersion affTopVer;
@@ -57,15 +55,15 @@ public class DiscoveryCustomEvent extends DiscoveryEvent {
     /**
      * @return Data.
      */
-    public Serializable data() {
-        return data;
+    public DiscoveryCustomMessage customMessage() {
+        return customMsg;
     }
 
     /**
-     * @param data New data.
+     * @param customMsg New customMessage.
      */
-    public void data(Serializable data) {
-        this.data = data;
+    public void customMessage(DiscoveryCustomMessage customMsg) {
+        this.customMsg = customMsg;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java
index 11af716..6a6f22a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java
@@ -73,6 +73,7 @@ public class IgfsMarshaller {
     }
 
     /**
+     * Serializes the message and sends it into the given output stream.
      * @param msg Message.
      * @param hdr Message header.
      * @param out Output.
@@ -119,6 +120,7 @@ public class IgfsMarshaller {
 
                     IgfsPathControlRequest req = (IgfsPathControlRequest)msg;
 
+                    U.writeString(out, req.userName());
                     writePath(out, req.path());
                     writePath(out, req.destinationPath());
                     out.writeBoolean(req.flag());
@@ -236,6 +238,7 @@ public class IgfsMarshaller {
                 case OPEN_CREATE: {
                     IgfsPathControlRequest req = new IgfsPathControlRequest();
 
+                    req.userName(U.readString(in));
                     req.path(readPath(in));
                     req.destinationPath(readPath(in));
                     req.flag(in.readBoolean());
@@ -298,8 +301,6 @@ public class IgfsMarshaller {
                 }
             }
 
-            assert msg != null;
-
             msg.command(cmd);
 
             return msg;
@@ -341,34 +342,4 @@ public class IgfsMarshaller {
 
         return null;
     }
-
-    /**
-     * Writes string to output.
-     *
-     * @param out Data output.
-     * @param str String.
-     * @throws IOException If write failed.
-     */
-    private void writeString(DataOutput out, @Nullable String str) throws IOException {
-        out.writeBoolean(str != null);
-
-        if (str != null)
-            out.writeUTF(str);
-    }
-
-    /**
-     * Reads string from input.
-     *
-     * @param in Data input.
-     * @return Read string.
-     * @throws IOException If read failed.
-     */
-    @Nullable private String readString(DataInput in) throws IOException {
-        boolean hasStr = in.readBoolean();
-
-        if (hasStr)
-            return in.readUTF();
-
-        return null;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java
index 7ed1619..2f6e6e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.igfs.common;
 
 import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
@@ -63,6 +64,9 @@ public class IgfsPathControlRequest extends IgfsMessage {
     /** Last modification time. */
     private long modificationTime;
 
+    /** The user name this control request is made on behalf of. */
+    private String userName;
+
     /**
      * @param path Path.
      */
@@ -235,4 +239,22 @@ public class IgfsPathControlRequest extends IgfsMessage {
     @Override public String toString() {
         return S.toString(IgfsPathControlRequest.class, this, "cmd", command());
     }
+
+    /**
+     * Getter for the user name.
+     * @return user name.
+     */
+    public final String userName() {
+        assert userName != null;
+
+        return userName;
+    }
+
+    /**
+     * Setter for the user name.
+     * @param userName the user name.
+     */
+    public final void userName(String userName) {
+        this.userName = IgfsUtils.fixUserName(userName);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropIgnition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropIgnition.java b/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropIgnition.java
index 77cd2a0..96639cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropIgnition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropIgnition.java
@@ -48,7 +48,7 @@ public class InteropIgnition {
      * @return Ignite instance.
      */
     public static synchronized InteropProcessor start(@Nullable String springCfgPath, @Nullable String gridName,
-                                                      int factoryId, long envPtr) {
+        int factoryId, long envPtr) {
         IgniteConfiguration cfg = configuration(springCfgPath);
 
         if (gridName != null)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index c93c059..bea4256 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -23,7 +23,7 @@ import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
-import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.timeout.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -31,7 +31,7 @@ import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.extensions.communication.*;
 import org.apache.ignite.plugin.security.*;
 import org.apache.ignite.spi.*;
-import org.apache.ignite.spi.swapspace.*;
+
 import org.jetbrains.annotations.*;
 
 import javax.cache.expiry.*;
@@ -439,46 +439,10 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
                         return ctx.cache().cache(cacheName).containsKey(key);
                     }
 
-                    @Override public void writeToSwap(String spaceName, Object key, @Nullable Object val,
-                        @Nullable ClassLoader ldr) {
-                        assert ctx.swap().enabled();
-
-                        try {
-                            ctx.swap().write(spaceName, key, val, ldr);
-                        }
-                        catch (IgniteCheckedException e) {
-                            throw U.convertException(e);
-                        }
-                    }
-
-                    @SuppressWarnings({"unchecked"})
-                    @Nullable @Override public <T> T readFromSwap(String spaceName, SwapKey key,
-                        @Nullable ClassLoader ldr) {
-                        try {
-                            assert ctx.swap().enabled();
-
-                            return ctx.swap().readValue(spaceName, key, ldr);
-                        }
-                        catch (IgniteCheckedException e) {
-                            throw U.convertException(e);
-                        }
-                    }
-
                     @Override public int partition(String cacheName, Object key) {
                         return ctx.cache().cache(cacheName).affinity().partition(key);
                     }
 
-                    @Override public void removeFromSwap(String spaceName, Object key, @Nullable ClassLoader ldr) {
-                        try {
-                            assert ctx.swap().enabled();
-
-                            ctx.swap().remove(spaceName, key, null, ldr);
-                        }
-                        catch (IgniteCheckedException e) {
-                            throw U.convertException(e);
-                        }
-                    }
-
                     @Override public IgniteNodeValidationResult validateNode(ClusterNode node) {
                         for (GridComponent comp : ctx) {
                             IgniteNodeValidationResult err = comp.validateNode(node);
@@ -508,26 +472,6 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
                         }
                     }
 
-                    @SuppressWarnings("unchecked")
-                    @Nullable @Override public <V> V readValueFromOffheapAndSwap(@Nullable String spaceName,
-                        Object key, @Nullable ClassLoader ldr) {
-                        try {
-                            IgniteInternalCache<Object, V> cache = ctx.cache().cache(spaceName);
-
-                            GridCacheContext cctx = cache.context();
-
-                            if (cctx.isNear())
-                                cctx = cctx.near().dht().context();
-
-                            GridCacheSwapEntry e = cctx.swap().read(cctx.toCacheKeyObject(key), true, true);
-
-                            return e != null ? CU.<V>value(e.value(), cctx, true) : null;
-                        }
-                        catch (IgniteCheckedException e) {
-                            throw U.convertException(e);
-                        }
-                    }
-
                     @Override public MessageFormatter messageFormatter() {
                         return ctx.io().formatter();
                     }
@@ -540,6 +484,14 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
                         return ctx.discovery().tryFailNode(nodeId);
                     }
 
+                    @Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) {
+                        ctx.timeout().addTimeoutObject(new GridSpiTimeoutObject(obj));
+                    }
+
+                    @Override public void removeTimeoutObject(IgniteSpiTimeoutObject obj) {
+                        ctx.timeout().removeTimeoutObject(new GridSpiTimeoutObject(obj));
+                    }
+
                     /**
                      * @param e Exception to handle.
                      * @return GridSpiException Converted exception.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
index 2e80b6f..ce2a36c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
@@ -56,11 +56,10 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
     private final GridMessageListener lsnr = new CheckpointRequestListener();
 
     /** */
-    private final ConcurrentMap<IgniteUuid, CheckpointSet> keyMap = new ConcurrentHashMap8<>();
+    private final ConcurrentMap<IgniteUuid, CheckpointSet> keyMap;
 
     /** */
-    private final Collection<IgniteUuid> closedSess = new GridBoundedConcurrentLinkedHashSet<>(
-        MAX_CLOSED_SESS, MAX_CLOSED_SESS, 0.75f, 256, PER_SEGMENT_Q);
+    private final Collection<IgniteUuid> closedSess;
 
     /** Grid marshaller. */
     private final Marshaller marsh;
@@ -72,6 +71,21 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
         super(ctx, ctx.config().getCheckpointSpi());
 
         marsh = ctx.config().getMarshaller();
+
+        if (enabled()) {
+            keyMap = new ConcurrentHashMap8<>();
+
+            closedSess = new GridBoundedConcurrentLinkedHashSet<>(MAX_CLOSED_SESS,
+                MAX_CLOSED_SESS,
+                0.75f,
+                256,
+                PER_SEGMENT_Q);
+        }
+        else {
+            keyMap = null;
+
+            closedSess = null;
+        }
     }
 
     /** {@inheritDoc} */
@@ -112,7 +126,7 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
      * @return Session IDs.
      */
     public Collection<IgniteUuid> sessionIds() {
-        return new ArrayList<>(keyMap.keySet());
+        return enabled() ? new ArrayList<>(keyMap.keySet()) : Collections.<IgniteUuid>emptyList();
     }
 
     /**
@@ -125,8 +139,17 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
      * @return {@code true} if checkpoint has been actually saved, {@code false} otherwise.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    public boolean storeCheckpoint(GridTaskSessionInternal ses, String key, Object state, ComputeTaskSessionScope scope,
-        long timeout, boolean override) throws IgniteCheckedException {
+    public boolean storeCheckpoint(GridTaskSessionInternal ses,
+        String key,
+        Object state,
+        ComputeTaskSessionScope scope,
+        long timeout,
+        boolean override)
+        throws IgniteCheckedException
+    {
+        if (!enabled())
+            return false;
+
         assert ses != null;
         assert key != null;
 
@@ -239,6 +262,9 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
      * @return Whether or not checkpoint was removed.
      */
     public boolean removeCheckpoint(String key) {
+        if (!enabled())
+            return false;
+
         assert key != null;
 
         boolean rmv = false;
@@ -256,6 +282,9 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
      * @return Whether or not checkpoint was removed.
      */
     public boolean removeCheckpoint(GridTaskSessionInternal ses, String key) {
+        if (!enabled())
+            return false;
+
         assert ses != null;
         assert key != null;
 
@@ -283,6 +312,9 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
     @Nullable public Serializable loadCheckpoint(GridTaskSessionInternal ses, String key) throws IgniteCheckedException {
+        if (!enabled())
+            return null;
+
         assert ses != null;
         assert key != null;
 
@@ -309,6 +341,9 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
      * @param cleanup Whether cleanup or not.
      */
     public void onSessionEnd(GridTaskSessionInternal ses, boolean cleanup) {
+        if (!enabled())
+            return;
+
         closedSess.add(ses.getId());
 
         // If on task node.
@@ -358,7 +393,7 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
     @Override public void printMemoryStats() {
         X.println(">>>");
         X.println(">>> Checkpoint manager memory stats [grid=" + ctx.gridName() + ']');
-        X.println(">>>  keyMap: " + keyMap.size());
+        X.println(">>>  keyMap: " + (keyMap != null ? keyMap.size() : 0));
     }
 
     /**
@@ -407,6 +442,9 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
             if (log.isDebugEnabled())
                 log.debug("Received checkpoint request: " + req);
 
+            if (!enabled())
+                return;
+
             IgniteUuid sesId = req.getSessionId();
 
             if (closedSess.contains(sesId)) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index c877d57..4382731 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -1211,6 +1211,11 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     public void addUserMessageListener(@Nullable final Object topic, @Nullable final IgniteBiPredicate<UUID, ?> p) {
         if (p != null) {
             try {
+                if (p instanceof GridLifecycleAwareMessageFilter)
+                    ((GridLifecycleAwareMessageFilter)p).initialize(ctx);
+                else
+                    ctx.resource().injectGeneric(p);
+
                 addMessageListener(TOPIC_COMM_USER,
                     new GridUserMessageListener(topic, (IgniteBiPredicate<UUID, Object>)p));
             }
@@ -1695,13 +1700,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             throws IgniteCheckedException {
             this.topic = topic;
             this.predLsnr = predLsnr;
-
-            if (predLsnr != null) {
-                if (predLsnr instanceof GridLifecycleAwareMessageFilter)
-                    ((GridLifecycleAwareMessageFilter)predLsnr).initialize(ctx);
-                else
-                    ctx.resource().injectGeneric(predLsnr);
-            }
         }
 
         /** {@inheritDoc} */
@@ -1724,69 +1722,84 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                 return;
             }
 
-            Object msgBody = ioMsg.body();
-
-            assert msgBody != null || ioMsg.bodyBytes() != null;
+            busyLock.readLock();
 
             try {
-                byte[] msgTopicBytes = ioMsg.topicBytes();
-
-                Object msgTopic = ioMsg.topic();
-
-                GridDeployment dep = ioMsg.deployment();
-
-                if (dep == null && ctx.config().isPeerClassLoadingEnabled() &&
-                    ioMsg.deploymentClassName() != null) {
-                    dep = ctx.deploy().getGlobalDeployment(
-                        ioMsg.deploymentMode(),
-                        ioMsg.deploymentClassName(),
-                        ioMsg.deploymentClassName(),
-                        ioMsg.userVersion(),
-                        nodeId,
-                        ioMsg.classLoaderId(),
-                        ioMsg.loaderParticipants(),
-                        null);
-
-                    if (dep == null)
-                        throw new IgniteDeploymentCheckedException(
-                            "Failed to obtain deployment information for user message. " +
-                            "If you are using custom message or topic class, try implementing " +
-                            "GridPeerDeployAware interface. [msg=" + ioMsg + ']');
-
-                    ioMsg.deployment(dep); // Cache deployment.
+                if (stopping) {
+                    if (log.isDebugEnabled())
+                        log.debug("Received user message while stopping (will ignore) [nodeId=" +
+                            nodeId + ", msg=" + msg + ']');
+
+                    return;
                 }
 
-                // Unmarshall message topic if needed.
-                if (msgTopic == null && msgTopicBytes != null) {
-                    msgTopic = marsh.unmarshal(msgTopicBytes, dep != null ? dep.classLoader() : null);
+                Object msgBody = ioMsg.body();
 
-                    ioMsg.topic(msgTopic); // Save topic to avoid future unmarshallings.
-                }
+                assert msgBody != null || ioMsg.bodyBytes() != null;
 
-                if (!F.eq(topic, msgTopic))
-                    return;
+                try {
+                    byte[] msgTopicBytes = ioMsg.topicBytes();
+
+                    Object msgTopic = ioMsg.topic();
+
+                    GridDeployment dep = ioMsg.deployment();
+
+                    if (dep == null && ctx.config().isPeerClassLoadingEnabled() &&
+                        ioMsg.deploymentClassName() != null) {
+                        dep = ctx.deploy().getGlobalDeployment(
+                            ioMsg.deploymentMode(),
+                            ioMsg.deploymentClassName(),
+                            ioMsg.deploymentClassName(),
+                            ioMsg.userVersion(),
+                            nodeId,
+                            ioMsg.classLoaderId(),
+                            ioMsg.loaderParticipants(),
+                            null);
+
+                        if (dep == null)
+                            throw new IgniteDeploymentCheckedException(
+                                "Failed to obtain deployment information for user message. " +
+                                    "If you are using custom message or topic class, try implementing " +
+                                    "GridPeerDeployAware interface. [msg=" + ioMsg + ']');
+
+                        ioMsg.deployment(dep); // Cache deployment.
+                    }
 
-                if (msgBody == null) {
-                    msgBody = marsh.unmarshal(ioMsg.bodyBytes(), dep != null ? dep.classLoader() : null);
+                    // Unmarshall message topic if needed.
+                    if (msgTopic == null && msgTopicBytes != null) {
+                        msgTopic = marsh.unmarshal(msgTopicBytes, dep != null ? dep.classLoader() : null);
 
-                    ioMsg.body(msgBody); // Save body to avoid future unmarshallings.
-                }
+                        ioMsg.topic(msgTopic); // Save topic to avoid future unmarshallings.
+                    }
 
-                // Resource injection.
-                if (dep != null)
-                    ctx.resource().inject(dep, dep.deployedClass(ioMsg.deploymentClassName()), msgBody);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to unmarshal user message [node=" + nodeId + ", message=" +
-                    msg + ']', e);
-            }
+                    if (!F.eq(topic, msgTopic))
+                        return;
+
+                    if (msgBody == null) {
+                        msgBody = marsh.unmarshal(ioMsg.bodyBytes(), dep != null ? dep.classLoader() : null);
 
-            if (msgBody != null) {
-                if (predLsnr != null) {
-                    if (!predLsnr.apply(nodeId, msgBody))
-                        removeMessageListener(TOPIC_COMM_USER, this);
+                        ioMsg.body(msgBody); // Save body to avoid future unmarshallings.
+                    }
+
+                    // Resource injection.
+                    if (dep != null)
+                        ctx.resource().inject(dep, dep.deployedClass(ioMsg.deploymentClassName()), msgBody);
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to unmarshal user message [node=" + nodeId + ", message=" +
+                        msg + ']', e);
+                }
+
+                if (msgBody != null) {
+                    if (predLsnr != null) {
+                        if (!predLsnr.apply(nodeId, msgBody))
+                            removeMessageListener(TOPIC_COMM_USER, this);
+                    }
                 }
             }
+            finally {
+                busyLock.readUnlock();
+            }
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java
new file mode 100644
index 0000000..2005d4e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.discovery;
+
+import org.apache.ignite.cluster.*;
+
+/**
+ * Listener interface.
+ */
+public interface CustomEventListener<T extends DiscoveryCustomMessage> {
+    /**
+     * @param snd Sender.
+     * @param msg Message.
+     */
+    public void onCustomEvent(ClusterNode snd, T msg);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
new file mode 100644
index 0000000..23f8bda
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.discovery;
+
+import org.apache.ignite.spi.discovery.*;
+import org.jetbrains.annotations.*;
+
+/**
+ *
+ */
+class CustomMessageWrapper implements DiscoverySpiCustomMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final DiscoveryCustomMessage delegate;
+
+    /**
+     * @param delegate Delegate.
+     */
+    CustomMessageWrapper(DiscoveryCustomMessage delegate) {
+        this.delegate = delegate;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoverySpiCustomMessage ackMessage() {
+        DiscoveryCustomMessage res = delegate.ackMessage();
+
+        return res == null ? null : new CustomMessageWrapper(res);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        return delegate.isMutable();
+    }
+
+    /**
+     * @return Delegate.
+     */
+    public DiscoveryCustomMessage delegate() {
+        return delegate;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return delegate.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
new file mode 100644
index 0000000..401486d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.discovery;
+
+import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ *
+ */
+public interface DiscoveryCustomMessage extends Serializable {
+    /**
+     * @return Unique custom message ID.
+     */
+    public IgniteUuid id();
+
+    /**
+     * Whether or not minor version of topology should be increased on message receive.
+     *
+     * @return {@code true} if minor topology version should be increased.
+     * @see AffinityTopologyVersion#minorTopVer
+     */
+    public boolean incrementMinorTopologyVersion();
+
+    /**
+     * Called when custom message has been handled by all nodes.
+     *
+     * @return Ack message or {@code null} if ack is not required.
+     */
+    @Nullable public DiscoveryCustomMessage ackMessage();
+
+    /**
+     * @return {@code true} if message can be modified during listener notification. Changes will be send to next nodes.
+     */
+    public boolean isMutable();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 0950774..71fbc61 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.jobmetrics.*;
 import org.apache.ignite.internal.processors.security.*;
+import org.apache.ignite.internal.processors.timeout.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.lang.*;
@@ -165,10 +166,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     private final GridLocalMetrics metrics = createMetrics();
 
     /** Metrics update worker. */
-    private final MetricsUpdater metricsUpdater = new MetricsUpdater();
+    private GridTimeoutProcessor.CancelableTask metricsUpdateTask;
 
     /** Custom event listener. */
-    private GridPlainInClosure<Serializable> customEvtLsnr;
+    private ConcurrentMap<Class<?>, List<CustomEventListener<DiscoveryCustomMessage>>> customEvtLsnrs =
+        new ConcurrentHashMap8<>();
 
     /** Map of dynamic cache filters. */
     private Map<String, CachePredicate> registeredCaches = new HashMap<>();
@@ -176,6 +178,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     /** */
     private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
 
+    /** Received custom messages history. */
+    private final ArrayDeque<IgniteUuid> rcvdCustomMsgs = new ArrayDeque<>();
+
     /** @param ctx Context. */
     public GridDiscoveryManager(GridKernalContext ctx) {
         super(ctx, ctx.config().getDiscoverySpi());
@@ -214,6 +219,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      *
      * @param cacheName Cache name.
      * @param filter Cache filter.
+     * @param nearEnabled Near enabled flag.
      * @param loc {@code True} if cache is local.
      */
     public void setCacheFilter(
@@ -240,12 +246,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      *
      * @param cacheName Cache name.
      * @param clientNodeId Near node ID.
+     * @param nearEnabled Near enabled flag.
      */
     public void addClientNode(String cacheName, UUID clientNodeId, boolean nearEnabled) {
-        CachePredicate predicate = registeredCaches.get(cacheName);
+        CachePredicate pred = registeredCaches.get(cacheName);
 
-        if (predicate != null)
-            predicate.addClientNode(clientNodeId, nearEnabled);
+        if (pred != null)
+            pred.addClientNode(clientNodeId, nearEnabled);
     }
 
     /**
@@ -279,17 +286,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         }
     }
 
-    /**
-     * @param evtType Event type.
-     * @return Next affinity topology version.
-     */
-    private AffinityTopologyVersion nextTopologyVersion(int evtType, long topVer) {
-        if (evtType == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT)
-            minorTopVer++;
-        else if (evtType != EVT_NODE_METRICS_UPDATED)
-            minorTopVer = 0;
-
-        return new AffinityTopologyVersion(topVer, minorTopVer);
+    /** {@inheritDoc} */
+    @Override protected void onKernalStart0() throws IgniteCheckedException {
+        if (Boolean.TRUE.equals(ctx.config().isClientMode()) && !getSpi().isClientMode())
+            ctx.performance().add("Enable client mode for TcpDiscoverySpi " +
+                    "(set TcpDiscoverySpi.forceServerMode to false)");
     }
 
     /** {@inheritDoc} */
@@ -328,7 +329,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
             checkSegmentOnStart();
         }
 
-        new IgniteThread(metricsUpdater).start();
+        metricsUpdateTask = ctx.timeout().schedule(new MetricsUpdater(), METRICS_UPDATE_FREQ, METRICS_UPDATE_FREQ);
 
         spi.setMetricsProvider(createMetricsProvider());
 
@@ -356,14 +357,41 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                 ClusterNode node,
                 Collection<ClusterNode> topSnapshot,
                 Map<Long, Collection<ClusterNode>> snapshots,
-                @Nullable Serializable data
+                @Nullable DiscoverySpiCustomMessage spiCustomMsg
             ) {
+                DiscoveryCustomMessage customMsg = spiCustomMsg == null ? null
+                    : ((CustomMessageWrapper)spiCustomMsg).delegate();
+
+                if (skipMessage(type, customMsg))
+                    return;
+
                 final ClusterNode locNode = localNode();
 
                 if (snapshots != null)
                     topHist = snapshots;
 
-                AffinityTopologyVersion nextTopVer = nextTopologyVersion(type, topVer);
+                boolean verChanged;
+
+                if (type == EVT_NODE_METRICS_UPDATED)
+                    verChanged = false;
+                else if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
+                    assert customMsg != null;
+
+                    if (customMsg.incrementMinorTopologyVersion()) {
+                        minorTopVer++;
+
+                        verChanged = true;
+                    }
+                    else
+                        verChanged = false;
+                }
+                else {
+                    minorTopVer = 0;
+
+                    verChanged = true;
+                }
+
+                AffinityTopologyVersion nextTopVer = new AffinityTopologyVersion(topVer, minorTopVer);
 
                 if (type == EVT_NODE_FAILED || type == EVT_NODE_LEFT) {
                     for (DiscoCache c : discoCacheHist.values())
@@ -373,19 +401,26 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                 }
 
                 if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
-                    try {
-                        if (customEvtLsnr != null)
-                            customEvtLsnr.apply(data);
-                    }
-                    catch (Exception e) {
-                        U.error(log, "Failed to notify direct custom event listener: " + data, e);
+                    for (Class cls = customMsg.getClass(); cls != null; cls = cls.getSuperclass()) {
+                        List<CustomEventListener<DiscoveryCustomMessage>> list = customEvtLsnrs.get(cls);
+
+                        if (list != null) {
+                            for (CustomEventListener<DiscoveryCustomMessage> lsnr : list) {
+                                try {
+                                    lsnr.onCustomEvent(node, customMsg);
+                                }
+                                catch (Exception e) {
+                                    U.error(log, "Failed to notify direct custom event listener: " + customMsg, e);
+                                }
+                            }
+                        }
                     }
                 }
 
                 // Put topology snapshot into discovery history.
                 // There is no race possible between history maintenance and concurrent discovery
                 // event notifications, since SPI notifies manager about all events from this listener.
-                if (type != EVT_NODE_METRICS_UPDATED) {
+                if (verChanged) {
                     DiscoCache cache = new DiscoCache(locNode, F.view(topSnapshot, F.remoteNodes(locNode.id())));
 
                     discoCacheHist.put(nextTopVer, cache);
@@ -417,7 +452,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     return;
                 }
 
-                discoWrk.addEvent(type, nextTopVer, node, topSnapshot, data);
+                discoWrk.addEvent(type, nextTopVer, node, topSnapshot, customMsg);
             }
         });
 
@@ -486,10 +521,43 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
-     * @param customEvtLsnr Custom event listener.
+     * @param type Message type.
+     * @param customMsg Custom message.
+     * @return {@code True} if should not process message.
      */
-    public void setCustomEventListener(GridPlainInClosure<Serializable> customEvtLsnr) {
-        this.customEvtLsnr = customEvtLsnr;
+    private boolean skipMessage(int type, @Nullable DiscoveryCustomMessage customMsg) {
+        if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
+            assert customMsg != null && customMsg.id() != null : customMsg;
+
+            if (rcvdCustomMsgs.contains(customMsg.id())) {
+                if (log.isDebugEnabled())
+                    log.debug("Received duplicated custom message, will ignore [msg=" + customMsg + "]");
+
+                return true;
+            }
+
+            rcvdCustomMsgs.addLast(customMsg.id());
+
+            while (rcvdCustomMsgs.size() > DISCOVERY_HISTORY_SIZE)
+                rcvdCustomMsgs.pollFirst();
+        }
+
+        return false;
+    }
+
+    /**
+     * @param msgCls Message class.
+     * @param lsnr Custom event listener.
+     */
+    public <T extends DiscoveryCustomMessage> void setCustomEventListener(Class<T> msgCls, CustomEventListener<T> lsnr) {
+        List<CustomEventListener<DiscoveryCustomMessage>> list = customEvtLsnrs.get(msgCls);
+
+        if (list == null) {
+            list = F.addIfAbsent(customEvtLsnrs, msgCls,
+                new CopyOnWriteArrayList<CustomEventListener<DiscoveryCustomMessage>>());
+        }
+
+        list.add((CustomEventListener<DiscoveryCustomMessage>)lsnr);
     }
 
     /**
@@ -660,7 +728,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                 Map<Integer, CacheMetrics> metrics = null;
 
                 for (GridCacheAdapter<?, ?> cache : caches) {
-                    if (cache.context().started() && cache.configuration().isStatisticsEnabled()) {
+                    if (cache.configuration().isStatisticsEnabled() &&
+                        cache.context().started() &&
+                        cache.context().affinity().affinityTopologyVersion().topologyVersion() > 0) {
                         if (metrics == null)
                             metrics = U.newHashMap(caches.size());
 
@@ -952,11 +1022,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         getSpi().setListener(null);
 
         // Stop discovery worker and metrics updater.
+        U.closeQuiet(metricsUpdateTask);
+
         U.cancel(discoWrk);
-        U.cancel(metricsUpdater);
 
         U.join(discoWrk, log);
-        U.join(metricsUpdater, log);
 
         // Stop SPI itself.
         stopSpi();
@@ -1218,13 +1288,23 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
-     * Gets alive remote nodes with at least one cache configured.
+     * Gets alive remote server nodes with at least one cache configured.
      *
      * @param topVer Topology version (maximum allowed node order).
      * @return Collection of alive cache nodes.
      */
-    public Collection<ClusterNode> aliveRemoteNodesWithCaches(AffinityTopologyVersion topVer) {
-        return resolveDiscoCache(null, topVer).aliveRemoteNodesWithCaches(topVer.topologyVersion());
+    public Collection<ClusterNode> aliveRemoteServerNodesWithCaches(AffinityTopologyVersion topVer) {
+        return resolveDiscoCache(null, topVer).aliveRemoteServerNodesWithCaches(topVer.topologyVersion());
+    }
+
+    /**
+     * Gets alive server nodes with at least one cache configured.
+     *
+     * @param topVer Topology version (maximum allowed node order).
+     * @return Collection of alive cache nodes.
+     */
+    public Collection<ClusterNode> aliveServerNodesWithCaches(AffinityTopologyVersion topVer) {
+        return resolveDiscoCache(null, topVer).aliveServerNodesWithCaches(topVer.topologyVersion());
     }
 
     /**
@@ -1256,9 +1336,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      * @return {@code True} if node is a cache data node.
      */
     public boolean cacheAffinityNode(ClusterNode node, String cacheName) {
-        CachePredicate predicate = registeredCaches.get(cacheName);
+        CachePredicate pred = registeredCaches.get(cacheName);
 
-        return predicate != null && predicate.dataNode(node);
+        return pred != null && pred.dataNode(node);
     }
 
     /**
@@ -1267,9 +1347,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      * @return {@code True} if node has near cache enabled.
      */
     public boolean cacheNearNode(ClusterNode node, String cacheName) {
-        CachePredicate predicate = registeredCaches.get(cacheName);
+        CachePredicate pred = registeredCaches.get(cacheName);
 
-        return predicate != null && predicate.nearNode(node);
+        return pred != null && pred.nearNode(node);
     }
 
     /**
@@ -1278,9 +1358,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      * @return {@code True} if node has client cache (without near cache).
      */
     public boolean cacheClientNode(ClusterNode node, String cacheName) {
-        CachePredicate predicate = registeredCaches.get(cacheName);
+        CachePredicate pred = registeredCaches.get(cacheName);
 
-        return predicate != null && predicate.clientNode(node);
+        return pred != null && pred.clientNode(node);
     }
 
     /**
@@ -1289,9 +1369,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      * @return If cache with the given name is accessible on the given node.
      */
     public boolean cacheNode(ClusterNode node, String cacheName) {
-        CachePredicate predicate = registeredCaches.get(cacheName);
+        CachePredicate pred = registeredCaches.get(cacheName);
 
-        return predicate != null && predicate.cacheNode(node);
+        return pred != null && pred.cacheNode(node);
     }
 
     /**
@@ -1384,10 +1464,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
-     * @param evt Event.
+     * @param msg Custom message.
      */
-    public void sendCustomEvent(Serializable evt) {
-        getSpi().sendCustomEvent(evt);
+    public void sendCustomEvent(DiscoveryCustomMessage msg) {
+        getSpi().sendCustomEvent(new CustomMessageWrapper(msg));
     }
 
     /**
@@ -1542,8 +1622,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     /** Worker for discovery events. */
     private class DiscoveryWorker extends GridWorker {
         /** Event queue. */
-        private final BlockingQueue<GridTuple5<Integer, AffinityTopologyVersion, ClusterNode, Collection<ClusterNode>, Serializable>> evts =
-            new LinkedBlockingQueue<>();
+        private final BlockingQueue<GridTuple5<Integer, AffinityTopologyVersion, ClusterNode, Collection<ClusterNode>,
+            DiscoveryCustomMessage>> evts = new LinkedBlockingQueue<>();
 
         /** Node segmented event fired flag. */
         private boolean nodeSegFired;
@@ -1609,9 +1689,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
             AffinityTopologyVersion topVer,
             ClusterNode node,
             Collection<ClusterNode> topSnapshot,
-            @Nullable Serializable data
+            @Nullable DiscoveryCustomMessage data
         ) {
-            assert node != null;
+            assert node != null : data;
 
             evts.add(F.t(type, topVer, node, topSnapshot, data));
         }
@@ -1650,7 +1730,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         /** @throws InterruptedException If interrupted. */
         @SuppressWarnings("DuplicateCondition")
         private void body0() throws InterruptedException {
-            GridTuple5<Integer, AffinityTopologyVersion, ClusterNode, Collection<ClusterNode>, Serializable> evt = evts.take();
+            GridTuple5<Integer, AffinityTopologyVersion, ClusterNode, Collection<ClusterNode>,
+                DiscoveryCustomMessage> evt = evts.take();
 
             int type = evt.get1();
 
@@ -1768,7 +1849,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                         customEvt.type(type);
                         customEvt.topologySnapshot(topVer.topologyVersion(), null);
                         customEvt.affinityTopologyVersion(topVer);
-                        customEvt.data(evt.get5());
+                        customEvt.customMessage(evt.get5());
 
                         ctx.event().record(customEvt);
                     }
@@ -1833,28 +1914,17 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     /**
      *
      */
-    private class MetricsUpdater extends GridWorker {
+    private class MetricsUpdater implements Runnable {
         /** */
         private long prevGcTime = -1;
 
         /** */
         private long prevCpuTime = -1;
 
-        /**
-         *
-         */
-        private MetricsUpdater() {
-            super(ctx.gridName(), "metrics-updater", GridDiscoveryManager.this.log);
-        }
-
         /** {@inheritDoc} */
-        @Override protected void body() throws IgniteInterruptedCheckedException {
-            while (!isCancelled()) {
-                U.sleep(METRICS_UPDATE_FREQ);
-
-                gcCpuLoad = getGcCpuLoad();
-                cpuLoad = getCpuLoad();
-            }
+        @Override public void run() {
+            gcCpuLoad = getGcCpuLoad();
+            cpuLoad = getCpuLoad();
         }
 
         /**
@@ -2065,9 +2135,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         private final Collection<ClusterNode> aliveNodesWithCaches;
 
         /**
-         * Cached alive remote nodes with caches.
+         * Cached alive server remote nodes with caches.
+         */
+        private final Collection<ClusterNode> aliveSrvNodesWithCaches;
+
+        /**
+         * Cached alive remote server nodes with caches.
          */
-        private final Collection<ClusterNode> aliveRmtNodesWithCaches;
+        private final Collection<ClusterNode> aliveRmtSrvNodesWithCaches;
 
         /**
          * @param loc Local node.
@@ -2088,21 +2163,21 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
             all.addAll(rmtNodes);
 
+            Collections.sort(all, GridNodeOrderComparator.INSTANCE);
+
             allNodes = Collections.unmodifiableList(all);
 
-            Map<String, Collection<ClusterNode>> cacheMap =
-                new HashMap<>(allNodes.size(), 1.0f);
-            Map<String, Collection<ClusterNode>> rmtCacheMap =
-                new HashMap<>(allNodes.size(), 1.0f);
-            Map<String, Collection<ClusterNode>> dhtNodesMap =
-                new HashMap<>(allNodes.size(), 1.0f);
+            Map<String, Collection<ClusterNode>> cacheMap = new HashMap<>(allNodes.size(), 1.0f);
+            Map<String, Collection<ClusterNode>> rmtCacheMap = new HashMap<>(allNodes.size(), 1.0f);
+            Map<String, Collection<ClusterNode>> dhtNodesMap =new HashMap<>(allNodes.size(), 1.0f);
             Collection<ClusterNode> nodesWithCaches = new HashSet<>(allNodes.size());
             Collection<ClusterNode> rmtNodesWithCaches = new HashSet<>(allNodes.size());
 
             aliveCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f);
             aliveRmtCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f);
             aliveNodesWithCaches = new ConcurrentSkipListSet<>();
-            aliveRmtNodesWithCaches = new ConcurrentSkipListSet<>();
+            aliveSrvNodesWithCaches = new ConcurrentSkipListSet<>();
+            aliveRmtSrvNodesWithCaches = new ConcurrentSkipListSet<>();
             nodesByVer = new TreeMap<>();
 
             long maxOrder0 = 0;
@@ -2154,8 +2229,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     if (alive(node.id())) {
                         aliveNodesWithCaches.add(node);
 
-                        if (!loc.id().equals(node.id()))
-                            aliveRmtNodesWithCaches.add(node);
+                        if (!CU.clientNode(node)) {
+                            aliveSrvNodesWithCaches.add(node);
+
+                            if (!loc.id().equals(node.id()))
+                                aliveRmtSrvNodesWithCaches.add(node);
+                        }
                     }
                 }
 
@@ -2240,13 +2319,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         }
 
         /**
-         * @return All nodes with at least one cache configured.
-         */
-        Collection<ClusterNode> allNodesWithCaches() {
-            return allNodesWithCaches;
-        }
-
-        /**
          * Gets collection of nodes which have version equal or greater than {@code ver}.
          *
          * @param ver Version to check.
@@ -2345,13 +2417,23 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         }
 
         /**
-         * Gets all alive remote nodes with at least one cache configured.
+         * Gets all alive remote server nodes with at least one cache configured.
          *
          * @param topVer Topology version.
          * @return Collection of nodes.
          */
-        Collection<ClusterNode> aliveRemoteNodesWithCaches(final long topVer) {
-            return filter(topVer, aliveRmtNodesWithCaches);
+        Collection<ClusterNode> aliveRemoteServerNodesWithCaches(final long topVer) {
+            return filter(topVer, aliveRmtSrvNodesWithCaches);
+        }
+
+        /**
+         * Gets all alive server nodes with at least one cache configured.
+         *
+         * @param topVer Topology version.
+         * @return Collection of nodes.
+         */
+        Collection<ClusterNode> aliveServerNodesWithCaches(final long topVer) {
+            return filter(topVer, aliveSrvNodesWithCaches);
         }
 
         /**
@@ -2388,7 +2470,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
             filterNodeMap(aliveRmtCacheNodes, leftNode);
 
             aliveNodesWithCaches.remove(leftNode);
-            aliveRmtNodesWithCaches.remove(leftNode);
+            aliveSrvNodesWithCaches.remove(leftNode);
+            aliveRmtSrvNodesWithCaches.remove(leftNode);
         }
 
         /**
@@ -2480,11 +2563,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         private boolean loc;
 
         /** Collection of client near nodes. */
-        private Map<UUID, Boolean> clientNodes;
+        private ConcurrentHashMap<UUID, Boolean> clientNodes;
 
         /**
          * @param cacheFilter Cache filter.
          * @param nearEnabled Near enabled flag.
+         * @param loc {@code True} if cache is local.
          */
         private CachePredicate(IgnitePredicate<ClusterNode> cacheFilter, boolean nearEnabled, boolean loc) {
             assert cacheFilter != null;
@@ -2498,9 +2582,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
         /**
          * @param nodeId Near node ID to add.
+         * @param nearEnabled Near enabled flag.
          */
         public void addClientNode(UUID nodeId, boolean nearEnabled) {
-            clientNodes.put(nodeId, nearEnabled);
+            clientNodes.putIfAbsent(nodeId, nearEnabled);
         }
 
         /**
@@ -2515,7 +2600,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
          * @return {@code True} if this node is a data node for given cache.
          */
         public boolean dataNode(ClusterNode node) {
-            return !node.isDaemon() && cacheFilter.apply(node);
+            return !node.isDaemon() && CU.affinityNode(node, cacheFilter);
         }
 
         /**
@@ -2523,8 +2608,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
          * @return {@code True} if cache is accessible on the given node.
          */
         public boolean cacheNode(ClusterNode node) {
-            return !node.isClient() && !node.isDaemon() &&
-                (cacheFilter.apply(node) || clientNodes.containsKey(node.id()));
+            return !node.isDaemon() && (CU.affinityNode(node, cacheFilter) || clientNodes.containsKey(node.id()));
         }
 
         /**
@@ -2535,8 +2619,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
             if (node.isDaemon())
                 return false;
 
-            if (nearEnabled && cacheFilter.apply(node))
-                return true;
+            if (CU.affinityNode(node, cacheFilter))
+                return nearEnabled;
 
             Boolean near = clientNodes.get(node.id());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/GridIndexingManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/GridIndexingManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/GridIndexingManager.java
index 9a81cd1..f1561bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/GridIndexingManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/GridIndexingManager.java
@@ -21,7 +21,6 @@ import org.apache.ignite.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.managers.*;
 import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.spi.*;
 import org.apache.ignite.spi.indexing.*;
 
@@ -46,9 +45,6 @@ public class GridIndexingManager extends GridManagerAdapter<IndexingSpi> {
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
     @Override public void start() throws IgniteCheckedException {
-        if (!enabled())
-            U.warn(log, "Indexing is disabled (to enable please configure GridIndexingSpi).");
-
         startSpi();
 
         if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
index e9df8b8..5373e46 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
@@ -68,6 +68,18 @@ class GridAffinityAssignment implements Serializable {
     }
 
     /**
+     * @param topVer Topology version.
+     * @param aff Assignment to copy from.
+     */
+    GridAffinityAssignment(AffinityTopologyVersion topVer, GridAffinityAssignment aff) {
+        this.topVer = topVer;
+
+        assignment = aff.assignment;
+        primary = aff.primary;
+        backup = aff.backup;
+    }
+
+    /**
      * @return Affinity assignment.
      */
     public List<List<ClusterNode>> assignment() {


Mime
View raw message