ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [05/41] ignite git commit: ignite-3038 Do not use custom discovery events to start continuous queries for system caches
Date Fri, 17 Jun 2016 08:54:14 GMT
ignite-3038 Do not use custom discovery events to start continuous queries for system caches


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7f878c56
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7f878c56
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7f878c56

Branch: refs/heads/ignite-3331
Commit: 7f878c56cf4e63cff1773dfe834cda7cd91c62ac
Parents: e10ffef
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Jun 14 10:17:50 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Jun 14 10:17:50 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/MarshallerContextImpl.java  |  51 ++-
 .../processors/cache/GridCacheAdapter.java      |  12 +-
 .../binary/CacheObjectBinaryProcessorImpl.java  |  29 +-
 .../continuous/CacheContinuousQueryManager.java | 135 +++++++
 .../cacheobject/IgniteCacheObjectProcessor.java |   7 +
 .../IgniteCacheObjectProcessorImpl.java         |   5 +
 .../continuous/GridContinuousProcessor.java     | 217 +++++++---
 .../service/GridServiceProcessor.java           | 391 ++++++++++---------
 ...eClientReconnectContinuousProcessorTest.java |  60 ++-
 ...ridCacheContinuousQueryAbstractSelfTest.java |   2 +-
 .../IgniteNoCustomEventsOnNodeStart.java        |  80 ++++
 .../service/GridServiceClientNodeTest.java      | 102 ++++-
 .../testsuites/IgniteCacheTestSuite2.java       |   3 +
 13 files changed, 813 insertions(+), 281 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7f878c56/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
index 8f566a9..b4c9607 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedException;
 import org.apache.ignite.internal.util.GridStripedLock;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.plugin.PluginProvider;
 
@@ -64,6 +65,9 @@ public class MarshallerContextImpl extends MarshallerContextAdapter {
     /** Non-volatile on purpose. */
     private int failedCnt;
 
+    /** */
+    private ContinuousQueryListener lsnr;
+
     /**
      * @param plugins Plugins.
      * @throws IgniteCheckedException In case of error.
@@ -75,25 +79,58 @@ public class MarshallerContextImpl extends MarshallerContextAdapter {
     }
 
     /**
+     * @param ctx Context.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void onContinuousProcessorStarted(GridKernalContext ctx) throws IgniteCheckedException {
+        if (ctx.clientNode()) {
+            lsnr = new ContinuousQueryListener(ctx.log(MarshallerContextImpl.class), workDir);
+
+            ctx.continuous().registerStaticRoutine(
+                CU.MARSH_CACHE_NAME,
+                lsnr,
+                null,
+                null);
+        }
+    }
+
+    /**
      * @param ctx Kernal context.
      * @throws IgniteCheckedException In case of error.
      */
     public void onMarshallerCacheStarted(GridKernalContext ctx) throws IgniteCheckedException {
         assert ctx != null;
 
-        if (!ctx.isDaemon()) {
+        log = ctx.log(MarshallerContextImpl.class);
+
+        cache = ctx.cache().marshallerCache();
+
+        if (ctx.cache().marshallerCache().context().affinityNode()) {
             ctx.cache().marshallerCache().context().continuousQueries().executeInternalQuery(
-                new ContinuousQueryListener(ctx.log(MarshallerContextImpl.class), workDir),
+                new ContinuousQueryListener(log, workDir),
                 null,
-                ctx.cache().marshallerCache().context().affinityNode(),
+                true,
                 true,
                 false
             );
         }
-
-        log = ctx.log(MarshallerContextImpl.class);
-
-        cache = ctx.cache().marshallerCache();
+        else {
+            if (lsnr != null) {
+                ctx.closure().runLocalSafe(new Runnable() {
+                    @SuppressWarnings("unchecked")
+                    @Override public void run() {
+                        try {
+                            Iterable entries = cache.context().continuousQueries().existingEntries(false, null);
+
+                            lsnr.onUpdated(entries);
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.error(log, "Failed to load marshaller cache entries: " + e, e);
+                        }
+                    }
+                });
+            }
+        }
 
         latch.countDown();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7f878c56/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 6e647c5..b4daec2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -4138,8 +4138,18 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
     /**
      * @return Distributed ignite cache iterator.
+     * @throws IgniteCheckedException If failed.
      */
     public Iterator<Cache.Entry<K, V>> igniteIterator() throws IgniteCheckedException {
+        return igniteIterator(ctx.keepBinary());
+    }
+
+    /**
+     * @param keepBinary
+     * @return Distributed ignite cache iterator.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Iterator<Cache.Entry<K, V>> igniteIterator(boolean keepBinary) throws IgniteCheckedException {
         GridCacheContext ctx0 = ctx.isNear() ? ctx.near().dht().context() : ctx;
 
         final CacheOperationContext opCtx = ctx.operationContextPerCall();
@@ -4147,7 +4157,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         if (!ctx0.isSwapOrOffheapEnabled() && ctx0.kernalContext().discovery().size() == 1)
             return localIteratorHonorExpirePolicy(opCtx);
 
-        final GridCloseableIterator<Map.Entry<K, V>> iter = ctx0.queries().createScanQuery(null, null, ctx.keepBinary())
+        final GridCloseableIterator<Map.Entry<K, V>> iter = ctx0.queries().createScanQuery(null, null, keepBinary)
             .keepAll(false)
             .executeScanQuery();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7f878c56/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index fe73f8a..d5756a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -154,9 +154,6 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
     /** Metadata updates collected before metadata cache is initialized. */
     private final Map<Integer, BinaryMetadata> metaBuf = new ConcurrentHashMap<>();
 
-    /** */
-    private UUID metaCacheQryId;
-
     /**
      * @param ctx Kernal context.
      */
@@ -260,6 +257,17 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
     }
 
     /** {@inheritDoc} */
+    @Override public void onContinuousProcessorStarted(GridKernalContext ctx) throws IgniteCheckedException {
+        if (clientNode && !ctx.isDaemon()) {
+            ctx.continuous().registerStaticRoutine(
+                CU.UTILITY_CACHE_NAME,
+                new MetaDataEntryListener(),
+                new MetaDataEntryFilter(),
+                null);
+        }
+    }
+
+    /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public void onUtilityCacheStarted() throws IgniteCheckedException {
         IgniteCacheProxy<Object, Object> proxy = ctx.cache().jcache(CU.UTILITY_CACHE_NAME);
@@ -276,13 +284,6 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
         if (clientNode) {
             assert !metaDataCache.context().affinityNode();
 
-            metaCacheQryId = metaDataCache.context().continuousQueries().executeInternalQuery(
-                new MetaDataEntryListener(),
-                new MetaDataEntryFilter(),
-                false,
-                true,
-                false);
-
             while (true) {
                 ClusterNode oldestSrvNode =
                     CU.oldestAliveCacheServerNode(ctx.cache().context(), AffinityTopologyVersion.NONE);
@@ -363,14 +364,6 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
         }
     }
 
-    /** {@inheritDoc} */
-    @Override public void onKernalStop(boolean cancel) {
-        super.onKernalStop(cancel);
-
-        if (metaCacheQryId != null)
-            metaDataCache.context().continuousQueries().cancelInternalQuery(metaCacheQryId);
-    }
-
     /**
      * @param key Metadata key.
      * @param newMeta Metadata.

http://git-wip-us.apache.org/repos/asf/ignite/blob/7f878c56/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index b042249..c966527 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -31,6 +31,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import javax.cache.Cache;
 import javax.cache.configuration.CacheEntryListenerConfiguration;
 import javax.cache.configuration.Factory;
 import javax.cache.event.CacheEntryCreatedListener;
@@ -46,6 +47,7 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
 import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridKernalContext;
@@ -57,8 +59,10 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
 import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteAsyncCallback;
 import org.apache.ignite.lang.IgniteClosure;
@@ -728,6 +732,70 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     }
 
     /**
+     * @param keepBinary Keep binary flag.
+     * @param filter Filter.
+     * @return Iterable for events created for existing cache entries.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Iterable<CacheEntryEvent<?, ?>> existingEntries(final boolean keepBinary, final CacheEntryEventFilter filter)
+        throws IgniteCheckedException {
+        final Iterator<Cache.Entry<?, ?>> it = cctx.cache().igniteIterator(keepBinary);
+
+        final Cache cache = cctx.kernalContext().cache().jcache(cctx.name());
+
+        return new Iterable<CacheEntryEvent<?, ?>>() {
+            @Override public Iterator<CacheEntryEvent<?, ?>> iterator() {
+                return new Iterator<CacheEntryEvent<?, ?>>() {
+                    private CacheQueryEntryEvent<?, ?> next;
+
+                    {
+                        advance();
+                    }
+
+                    @Override public boolean hasNext() {
+                        return next != null;
+                    }
+
+                    @Override public CacheEntryEvent<?, ?> next() {
+                        if (!hasNext())
+                            throw new NoSuchElementException();
+
+                        CacheEntryEvent next0 = next;
+
+                        advance();
+
+                        return next0;
+                    }
+
+                    @Override public void remove() {
+                        throw new UnsupportedOperationException();
+                    }
+
+                    private void advance() {
+                        next = null;
+
+                        while (next == null) {
+                            if (!it.hasNext())
+                                break;
+
+                            Cache.Entry e = it.next();
+
+                            next = new CacheEntryEventImpl(
+                                cache,
+                                CREATED,
+                                e.getKey(),
+                                e.getValue());
+
+                            if (filter != null && !filter.evaluate(next))
+                                next = null;
+                        }
+                    }
+                };
+            }
+        };
+    }
+
+    /**
      * @param nodes Nodes.
      * @return {@code True} if all nodes greater than {@link GridContinuousProcessor#QUERY_MSG_VER_2_SINCE},
      *     otherwise {@code false}.
@@ -1129,4 +1197,71 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                 lsnr.acknowledgeBackupOnTimeout(ctx);
         }
     }
+
+    /**
+     *
+     */
+    private static class CacheEntryEventImpl extends CacheQueryEntryEvent {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        @GridToStringInclude
+        private Object key;
+
+        /** */
+        @GridToStringInclude
+        private Object val;
+
+        /**
+         * @param src Event source.
+         * @param evtType Event type.
+         * @param key Key.
+         * @param val Value.
+         */
+        public CacheEntryEventImpl(Cache src, EventType evtType, Object key, Object val) {
+            super(src, evtType);
+
+            this.key = key;
+            this.val = val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public long getPartitionUpdateCounter() {
+            return 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object getOldValue() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isOldValueAvailable() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object getKey() {
+            return key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object getValue() {
+            return val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object unwrap(Class cls) {
+            if (cls.isAssignableFrom(getClass()))
+                return cls.cast(this);
+
+            throw new IllegalArgumentException("Unwrapping to class is not supported: " + cls);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(CacheEntryEventImpl.class, this);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7f878c56/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
index cadf1a9..99de507 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
@@ -22,6 +22,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.GridComponent;
+import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.GridProcessor;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
@@ -34,6 +35,12 @@ import org.jetbrains.annotations.Nullable;
  */
 public interface IgniteCacheObjectProcessor extends GridProcessor {
     /**
+     * @param ctx Context.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void onContinuousProcessorStarted(GridKernalContext ctx) throws IgniteCheckedException;
+
+    /**
      * @see GridComponent#onKernalStart()
      * @throws IgniteCheckedException If failed.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7f878c56/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
index 75d65de..6630c7a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
@@ -235,6 +235,11 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
     }
 
     /** {@inheritDoc} */
+    @Override public void onContinuousProcessorStarted(GridKernalContext ctx) throws IgniteCheckedException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
     @Override public void onUtilityCacheStarted() throws IgniteCheckedException {
         // No-op.
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7f878c56/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index fd798df..e96e646 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -30,12 +30,15 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import javax.cache.event.CacheEntryUpdatedListener;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
@@ -64,7 +67,6 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
@@ -83,6 +85,7 @@ import org.jsr166.ConcurrentHashMap8;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;
+import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
 import static org.apache.ignite.internal.GridTopic.TOPIC_CONTINUOUS;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
 import static org.apache.ignite.internal.processors.continuous.GridContinuousMessageType.MSG_EVT_ACK;
@@ -137,6 +140,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     /** */
     private boolean processorStopped;
 
+    /** Query sequence number for message topic. */
+    private final AtomicLong seq = new AtomicLong();
+
     /**
      * @param ctx Kernal context.
      */
@@ -255,13 +261,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                         UUID routineId = msg.routineId();
 
                         unregisterRemote(routineId);
+                    }
 
-                        if (snd.isClient()) {
-                            Map<UUID, LocalRoutineInfo> clientRoutineMap = clientInfos.get(snd.id());
-
-                            if (clientRoutineMap != null)
-                                clientRoutineMap.remove(msg.routineId());
-                        }
+                    for (Map<UUID, LocalRoutineInfo> clientInfo : clientInfos.values()) {
+                        if (clientInfo.remove(msg.routineId()) != null)
+                            break;
                     }
                 }
             });
@@ -310,6 +314,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             }
         });
 
+        ctx.marshallerContext().onContinuousProcessorStarted(ctx);
+
+        ctx.cacheObjects().onContinuousProcessorStarted(ctx);
+
+        ctx.service().onContinuousProcessorStarted(ctx);
+
         if (log.isDebugEnabled())
             log.debug("Continuous processor started.");
     }
@@ -393,16 +403,33 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Override @Nullable public Serializable collectDiscoveryData(UUID nodeId) {
+        if (log.isDebugEnabled()) {
+            log.debug("collectDiscoveryData [node=" + nodeId +
+                ", loc=" + ctx.localNodeId() +
+                ", locInfos=" + locInfos +
+                ", clientInfos=" + clientInfos +
+                ']');
+        }
+
         if (!nodeId.equals(ctx.localNodeId()) || !locInfos.isEmpty()) {
             Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos0 = U.newHashMap(clientInfos.size());
 
             for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> e : clientInfos.entrySet()) {
-                Map<UUID, LocalRoutineInfo> copy = U.newHashMap(e.getValue().size());
+                Map<UUID, LocalRoutineInfo> cp = U.newHashMap(e.getValue().size());
 
                 for (Map.Entry<UUID, LocalRoutineInfo> e0 : e.getValue().entrySet())
-                    copy.put(e0.getKey(), e0.getValue());
+                    cp.put(e0.getKey(), e0.getValue());
+
+                clientInfos0.put(e.getKey(), cp);
+            }
+
+            if (nodeId.equals(ctx.localNodeId()) && ctx.discovery().localNode().isClient()) {
+                Map<UUID, LocalRoutineInfo> infos = new HashMap<>();
 
-                clientInfos0.put(e.getKey(), copy);
+                for (Map.Entry<UUID, LocalRoutineInfo> e : locInfos.entrySet())
+                    infos.put(e.getKey(), e.getValue());
+
+                clientInfos0.put(ctx.localNodeId(), infos);
             }
 
             DiscoveryData data = new DiscoveryData(ctx.localNodeId(), clientInfos0);
@@ -430,6 +457,14 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable obj) {
         DiscoveryData data = (DiscoveryData)obj;
 
+        if (log.isDebugEnabled()) {
+            log.info("onDiscoveryDataReceived [joining=" + joiningNodeId +
+                ", rmtNodeId=" + rmtNodeId +
+                ", loc=" + ctx.localNodeId() +
+                ", data=" + data +
+                ']');
+        }
+
         if (!ctx.isDaemon() && data != null) {
             for (DiscoveryDataItem item : data.items) {
                 try {
@@ -457,29 +492,31 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> entry : data.clientInfos.entrySet()) {
                 UUID clientNodeId = entry.getKey();
 
-                Map<UUID, LocalRoutineInfo> clientRoutineMap = entry.getValue();
+                if (!ctx.localNodeId().equals(clientNodeId)) {
+                    Map<UUID, LocalRoutineInfo> clientRoutineMap = entry.getValue();
 
-                for (Map.Entry<UUID, LocalRoutineInfo> e : clientRoutineMap.entrySet()) {
-                    UUID routineId = e.getKey();
-                    LocalRoutineInfo info = e.getValue();
+                    for (Map.Entry<UUID, LocalRoutineInfo> e : clientRoutineMap.entrySet()) {
+                        UUID routineId = e.getKey();
+                        LocalRoutineInfo info = e.getValue();
 
-                    try {
-                        if (info.prjPred != null)
-                            ctx.resource().injectGeneric(info.prjPred);
-
-                        if (info.prjPred == null || info.prjPred.apply(ctx.discovery().localNode())) {
-                            if (registerHandler(clientNodeId,
-                                routineId,
-                                info.hnd,
-                                info.bufSize,
-                                info.interval,
-                                info.autoUnsubscribe,
-                                false))
-                                info.hnd.onListenerRegistered(routineId, ctx);
+                        try {
+                            if (info.prjPred != null)
+                                ctx.resource().injectGeneric(info.prjPred);
+
+                            if (info.prjPred == null || info.prjPred.apply(ctx.discovery().localNode())) {
+                                if (registerHandler(clientNodeId,
+                                    routineId,
+                                    info.hnd,
+                                    info.bufSize,
+                                    info.interval,
+                                    info.autoUnsubscribe,
+                                    false))
+                                    info.hnd.onListenerRegistered(routineId, ctx);
+                            }
+                        }
+                        catch (IgniteCheckedException err) {
+                            U.error(log, "Failed to register continuous handler.", err);
                         }
-                    }
-                    catch (IgniteCheckedException err) {
-                        U.error(log, "Failed to register continuous handler.", err);
                     }
                 }
 
@@ -537,6 +574,47 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Registers routine info to be sent in discovery data during this node join
+     * (to be used for internal queries started from client nodes).
+     *
+     * @param cacheName Cache name.
+     * @param locLsnr Local listener.
+     * @param rmtFilter Remote filter.
+     * @param prjPred Projection predicate.
+     * @return Routine ID.
+     * @throws IgniteCheckedException If failed.
+     */
+    public UUID registerStaticRoutine(
+        String cacheName,
+        CacheEntryUpdatedListener<?, ?> locLsnr,
+        CacheEntryEventSerializableFilter rmtFilter,
+        @Nullable IgnitePredicate<ClusterNode> prjPred) throws IgniteCheckedException {
+        String topicPrefix = "CONTINUOUS_QUERY_STATIC" + "_" + cacheName;
+
+        CacheContinuousQueryHandler hnd = new CacheContinuousQueryHandler(
+            cacheName,
+            TOPIC_CACHE.topic(topicPrefix, ctx.localNodeId(), seq.incrementAndGet()),
+            locLsnr,
+            rmtFilter,
+            true,
+            false,
+            true,
+            false);
+
+        hnd.internal(true);
+
+        final UUID routineId = UUID.randomUUID();
+
+        LocalRoutineInfo routineInfo = new LocalRoutineInfo(prjPred, hnd, 1, 0, true);
+
+        locInfos.put(routineId, routineInfo);
+
+        registerMessageListener(hnd);
+
+        return routineId;
+    }
+
+    /**
      * @param hnd Handler.
      * @param bufSize Buffer size.
      * @param interval Time interval.
@@ -610,29 +688,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         }
 
         // Register per-routine notifications listener if ordered messaging is used.
-        if (hnd.orderedTopic() != null) {
-            ctx.io().addMessageListener(hnd.orderedTopic(), new GridMessageListener() {
-                @Override public void onMessage(UUID nodeId, Object obj) {
-                    GridContinuousMessage msg = (GridContinuousMessage)obj;
-
-                    // Only notification can be ordered.
-                    assert msg.type() == MSG_EVT_NOTIFICATION;
-
-                    if (msg.data() == null && msg.dataBytes() != null) {
-                        try {
-                            msg.data(marsh.unmarshal(msg.dataBytes(), U.resolveClassLoader(ctx.config())));
-                        }
-                        catch (IgniteCheckedException e) {
-                            U.error(log, "Failed to process message (ignoring): " + msg, e);
-
-                            return;
-                        }
-                    }
-
-                    processNotification(nodeId, msg);
-                }
-            });
-        }
+        registerMessageListener(hnd);
 
         StartFuture fut = new StartFuture(ctx, routineId);
 
@@ -664,6 +720,35 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param hnd Handler.
+     */
+    private void registerMessageListener(GridContinuousHandler hnd) {
+        if (hnd.orderedTopic() != null) {
+            ctx.io().addMessageListener(hnd.orderedTopic(), new GridMessageListener() {
+                @Override public void onMessage(UUID nodeId, Object obj) {
+                    GridContinuousMessage msg = (GridContinuousMessage)obj;
+
+                    // Only notification can be ordered.
+                    assert msg.type() == MSG_EVT_NOTIFICATION;
+
+                    if (msg.data() == null && msg.dataBytes() != null) {
+                        try {
+                            msg.data(marsh.unmarshal(msg.dataBytes(), U.resolveClassLoader(ctx.config())));
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.error(log, "Failed to process message (ignoring): " + msg, e);
+
+                            return;
+                        }
+                    }
+
+                    processNotification(nodeId, msg);
+                }
+            });
+        }
+    }
+
+    /**
      * @param routineId Consume ID.
      * @return Future.
      */
@@ -807,12 +892,28 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
         cancelFutures(new IgniteClientDisconnectedCheckedException(reconnectFut, "Client node disconnected."));
 
-        for (UUID rmtId : rmtInfos.keySet())
-            unregisterRemote(rmtId);
+        if (log.isDebugEnabled()) {
+            log.debug("onDisconnected [rmtInfos=" + rmtInfos +
+                ", locInfos=" + locInfos +
+                ", clientInfos=" + clientInfos + ']');
+        }
+
+        for (Map.Entry<UUID, RemoteRoutineInfo> e : rmtInfos.entrySet()) {
+            RemoteRoutineInfo info = e.getValue();
+
+            if (!ctx.localNodeId().equals(info.nodeId) || info.autoUnsubscribe)
+                unregisterRemote(e.getKey());
+        }
 
         rmtInfos.clear();
 
         clientInfos.clear();
+
+        if (log.isDebugEnabled()) {
+            log.debug("after onDisconnected [rmtInfos=" + rmtInfos +
+                ", locInfos=" + locInfos +
+                ", clientInfos=" + clientInfos + ']');
+        }
     }
 
     /**
@@ -1038,6 +1139,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         }
 
         if (doRegister) {
+            if (log.isDebugEnabled())
+                log.debug("Register handler: [nodeId=" + nodeId + ", routineId=" + routineId + ", info=" + info + ']');
+
             if (interval > 0) {
                 IgniteThread checker = new IgniteThread(new GridWorker(ctx.gridName(), "continuous-buffer-checker", log) {
                     @SuppressWarnings("ConstantConditions")
@@ -1152,6 +1256,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             stopLock.unlock();
         }
 
+        if (log.isDebugEnabled())
+            log.debug("unregisterRemote [routineId=" + routineId + ", loc=" + loc + ", rmt=" + remote + ']');
+
         if (remote != null)
             unregisterHandler(routineId, remote.hnd, false);
         else if (loc != null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7f878c56/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 97d9988..853e6bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.processors.cache.CacheIteratorConverter;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.query.CacheQuery;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
@@ -144,12 +145,6 @@ public class GridServiceProcessor extends GridProcessorAdapter {
     /** Topology listener. */
     private GridLocalEventListener topLsnr = new TopologyListener();
 
-    /** Deployment listener ID. */
-    private UUID cfgQryId;
-
-    /** Assignment listener ID. */
-    private UUID assignQryId;
-
     /**
      * @param ctx Kernal context.
      */
@@ -166,6 +161,22 @@ public class GridServiceProcessor extends GridProcessorAdapter {
             new ServicesCompatibilityState(srvcCompatibilitySysProp != null ? srvcCompatibilitySysProp : false, false));
     }
 
+    /**
+     * @param ctx Context.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void onContinuousProcessorStarted(GridKernalContext ctx) throws IgniteCheckedException {
+        if (ctx.clientNode()) {
+            assert !ctx.isDaemon();
+
+            ctx.continuous().registerStaticRoutine(
+                CU.UTILITY_CACHE_NAME,
+                new ServiceEntriesListener(),
+                null,
+                null);
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public void start() throws IgniteCheckedException {
         ctx.addNodeAttribute(ATTR_SERVICES_COMPATIBILITY_MODE, srvcCompatibilitySysProp);
@@ -197,13 +208,32 @@ public class GridServiceProcessor extends GridProcessorAdapter {
             if (ctx.deploy().enabled())
                 ctx.cache().context().deploy().ignoreOwnership(true);
 
-            boolean affNode = cache.context().affinityNode();
+            if (!ctx.clientNode()) {
+                assert cache.context().affinityNode();
+
+                cache.context().continuousQueries().executeInternalQuery(new ServiceEntriesListener(),
+                    null,
+                    true,
+                    true,
+                    false);
+            }
+            else {
+                assert !ctx.isDaemon();
 
-            cfgQryId = cache.context().continuousQueries().executeInternalQuery(
-                new DeploymentListener(), null, affNode, true, !affNode);
+                ctx.closure().runLocalSafe(new Runnable() {
+                    @Override public void run() {
+                        try {
+                            Iterable<CacheEntryEvent<?, ?>> entries =
+                                cache.context().continuousQueries().existingEntries(false, null);
 
-            assignQryId = cache.context().continuousQueries().executeInternalQuery(
-                new AssignmentListener(), null, affNode, true, !affNode);
+                            onSystemCacheUpdated(entries);
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.error(log, "Failed to load service entries: " + e, e);
+                        }
+                    }
+                });
+            }
         }
         finally {
             if (ctx.deploy().enabled())
@@ -237,12 +267,6 @@ public class GridServiceProcessor extends GridProcessorAdapter {
         if (!ctx.clientNode())
             ctx.event().removeLocalEventListener(topLsnr);
 
-        if (cfgQryId != null)
-            cache.context().continuousQueries().cancelInternalQuery(cfgQryId);
-
-        if (assignQryId != null)
-            cache.context().continuousQueries().cancelInternalQuery(assignQryId);
-
         Collection<ServiceContextImpl> ctxs = new ArrayList<>();
 
         synchronized (locSvcs) {
@@ -1284,148 +1308,171 @@ public class GridServiceProcessor extends GridProcessorAdapter {
     /**
      * Service deployment listener.
      */
-    private class DeploymentListener implements CacheEntryUpdatedListener<Object, Object> {
+    @SuppressWarnings("unchecked")
+    private class ServiceEntriesListener implements CacheEntryUpdatedListener<Object, Object> {
         /** {@inheritDoc} */
         @Override public void onUpdated(final Iterable<CacheEntryEvent<?, ?>> deps) {
             depExe.submit(new BusyRunnable() {
                 @Override public void run0() {
-                    boolean firstTime = true;
+                    onSystemCacheUpdated(deps);
+                }
+            });
+        }
+    }
 
-                    for (CacheEntryEvent<?, ?> e : deps) {
-                        if (!(e.getKey() instanceof GridServiceDeploymentKey))
-                            continue;
+    /**
+     * @param evts Update events.
+     */
+    private void onSystemCacheUpdated(final Iterable<CacheEntryEvent<?, ?>> evts) {
+        boolean firstTime = true;
 
-                        if (firstTime) {
-                            markCompatibilityStateAsUsed();
+        for (CacheEntryEvent<?, ?> e : evts) {
+            if (e.getKey() instanceof GridServiceDeploymentKey) {
+                if (firstTime) {
+                    markCompatibilityStateAsUsed();
 
-                            firstTime = false;
-                        }
+                    firstTime = false;
+                }
 
-                        GridServiceDeployment dep;
+                processDeployment((CacheEntryEvent)e);
+            }
+            else if (e.getKey() instanceof GridServiceAssignmentsKey) {
+                if (firstTime) {
+                    markCompatibilityStateAsUsed();
 
-                        try {
-                            dep = (GridServiceDeployment)e.getValue();
-                        }
-                        catch (IgniteException ex) {
-                            if (X.hasCause(ex, ClassNotFoundException.class))
-                                continue;
-                            else
-                                throw ex;
-                        }
+                    firstTime = false;
+                }
+
+                processAssignment((CacheEntryEvent)e);
+            }
+        }
+    }
 
-                        if (dep != null) {
-                            svcName.set(dep.configuration().getName());
+    /**
+     * @param e Entry.
+     */
+    private void processDeployment(CacheEntryEvent<GridServiceDeploymentKey, GridServiceDeployment> e) {
+        GridServiceDeployment dep;
 
-                            // Ignore other utility cache events.
-                            long topVer = ctx.discovery().topologyVersion();
+        try {
+            dep = e.getValue();
+        }
+        catch (IgniteException ex) {
+            if (X.hasCause(ex, ClassNotFoundException.class))
+                return;
+            else
+                throw ex;
+        }
 
-                            ClusterNode oldest = U.oldest(ctx.discovery().nodes(topVer), null);
+        if (dep != null) {
+            svcName.set(dep.configuration().getName());
 
-                            if (oldest.isLocal())
-                                onDeployment(dep, topVer);
-                        }
-                        // Handle undeployment.
-                        else {
-                            String name = ((GridServiceDeploymentKey)e.getKey()).name();
+            // Ignore other utility cache events.
+            long topVer = ctx.discovery().topologyVersion();
 
-                            svcName.set(name);
+            ClusterNode oldest = U.oldest(ctx.discovery().nodes(topVer), null);
 
-                            Collection<ServiceContextImpl> ctxs;
+            if (oldest.isLocal())
+                onDeployment(dep, topVer);
+        }
+        // Handle undeployment.
+        else {
+            String name = e.getKey().name();
 
-                            synchronized (locSvcs) {
-                                ctxs = locSvcs.remove(name);
-                            }
+            svcName.set(name);
 
-                            if (ctxs != null) {
-                                synchronized (ctxs) {
-                                    cancel(ctxs, ctxs.size());
-                                }
-                            }
+            Collection<ServiceContextImpl> ctxs;
 
-                            // Finish deployment futures if undeployment happened.
-                            GridFutureAdapter<?> fut = depFuts.remove(name);
+            synchronized (locSvcs) {
+                ctxs = locSvcs.remove(name);
+            }
 
-                            if (fut != null)
-                                fut.onDone();
+            if (ctxs != null) {
+                synchronized (ctxs) {
+                    cancel(ctxs, ctxs.size());
+                }
+            }
 
-                            // Complete undeployment future.
-                            fut = undepFuts.remove(name);
+            // Finish deployment futures if undeployment happened.
+            GridFutureAdapter<?> fut = depFuts.remove(name);
 
-                            if (fut != null)
-                                fut.onDone();
+            if (fut != null)
+                fut.onDone();
 
-                            GridServiceAssignmentsKey key = new GridServiceAssignmentsKey(name);
+            // Complete undeployment future.
+            fut = undepFuts.remove(name);
 
-                            // Remove assignment on primary node in case of undeploy.
-                            if (cache.cache().affinity().isPrimary(ctx.discovery().localNode(), key)) {
-                                try {
-                                    cache.getAndRemove(key);
-                                }
-                                catch (IgniteCheckedException ex) {
-                                    U.error(log, "Failed to remove assignments for undeployed service: " + name, ex);
-                                }
-                            }
-                        }
-                    }
+            if (fut != null)
+                fut.onDone();
+
+            GridServiceAssignmentsKey key = new GridServiceAssignmentsKey(name);
+
+            // Remove assignment on primary node in case of undeploy.
+            if (cache.cache().affinity().isPrimary(ctx.discovery().localNode(), key)) {
+                try {
+                    cache.getAndRemove(key);
                 }
-            });
+                catch (IgniteCheckedException ex) {
+                    U.error(log, "Failed to remove assignments for undeployed service: " + name, ex);
+                }
+            }
         }
+    }
 
-        /**
-         * Deployment callback.
-         *
-         * @param dep Service deployment.
-         * @param topVer Topology version.
-         */
-        private void onDeployment(final GridServiceDeployment dep, final long topVer) {
-            // Retry forever.
-            try {
-                long newTopVer = ctx.discovery().topologyVersion();
+    /**
+     * Deployment callback.
+     *
+     * @param dep Service deployment.
+     * @param topVer Topology version.
+     */
+    private void onDeployment(final GridServiceDeployment dep, final long topVer) {
+        // Retry forever.
+        try {
+            long newTopVer = ctx.discovery().topologyVersion();
 
-                // If topology version changed, reassignment will happen from topology event.
-                if (newTopVer == topVer)
-                    reassign(dep, topVer);
-            }
-            catch (IgniteCheckedException e) {
-                if (!(e instanceof ClusterTopologyCheckedException))
-                    log.error("Failed to do service reassignment (will retry): " + dep.configuration().getName(), e);
+            // If topology version changed, reassignment will happen from topology event.
+            if (newTopVer == topVer)
+                reassign(dep, topVer);
+        }
+        catch (IgniteCheckedException e) {
+            if (!(e instanceof ClusterTopologyCheckedException))
+                log.error("Failed to do service reassignment (will retry): " + dep.configuration().getName(), e);
 
-                long newTopVer = ctx.discovery().topologyVersion();
+            long newTopVer = ctx.discovery().topologyVersion();
 
-                if (newTopVer != topVer) {
-                    assert newTopVer > topVer;
+            if (newTopVer != topVer) {
+                assert newTopVer > topVer;
 
-                    // Reassignment will happen from topology event.
-                    return;
-                }
+                // Reassignment will happen from topology event.
+                return;
+            }
 
-                ctx.timeout().addTimeoutObject(new GridTimeoutObject() {
-                    private IgniteUuid id = IgniteUuid.randomUuid();
+            ctx.timeout().addTimeoutObject(new GridTimeoutObject() {
+                private IgniteUuid id = IgniteUuid.randomUuid();
 
-                    private long start = System.currentTimeMillis();
+                private long start = System.currentTimeMillis();
 
-                    @Override public IgniteUuid timeoutId() {
-                        return id;
-                    }
+                @Override public IgniteUuid timeoutId() {
+                    return id;
+                }
 
-                    @Override public long endTime() {
-                        return start + RETRY_TIMEOUT;
-                    }
+                @Override public long endTime() {
+                    return start + RETRY_TIMEOUT;
+                }
 
-                    @Override public void onTimeout() {
-                        if (!busyLock.enterBusy())
-                            return;
+                @Override public void onTimeout() {
+                    if (!busyLock.enterBusy())
+                        return;
 
-                        try {
-                            // Try again.
-                            onDeployment(dep, topVer);
-                        }
-                        finally {
-                            busyLock.leaveBusy();
-                        }
+                    try {
+                        // Try again.
+                        onDeployment(dep, topVer);
                     }
-                });
-            }
+                    finally {
+                        busyLock.leaveBusy();
+                    }
+                }
+            });
         }
     }
 
@@ -1585,79 +1632,59 @@ public class GridServiceProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * Assignment listener.
+     * @param e Entry.
      */
-    private class AssignmentListener implements CacheEntryUpdatedListener<Object, Object> {
-        /** {@inheritDoc} */
-        @Override public void onUpdated(final Iterable<CacheEntryEvent<?, ?>> assignCol) throws CacheEntryListenerException {
-            depExe.submit(new BusyRunnable() {
-                @Override public void run0() {
-                    boolean firstTime = true;
-
-                    for (CacheEntryEvent<?, ?> e : assignCol) {
-                        if (!(e.getKey() instanceof GridServiceAssignmentsKey))
-                            continue;
-
-                        if (firstTime) {
-                            markCompatibilityStateAsUsed();
+    private void processAssignment(CacheEntryEvent<GridServiceAssignmentsKey, GridServiceAssignments> e) {
+        GridServiceAssignments assigns;
 
-                            firstTime = false;
-                        }
-
-                        GridServiceAssignments assigns;
-
-                        try {
-                            assigns = (GridServiceAssignments)e.getValue();
-                        }
-                        catch (IgniteException ex) {
-                            if (X.hasCause(ex, ClassNotFoundException.class))
-                                continue;
-                            else
-                                throw ex;
-                        }
+        try {
+            assigns = e.getValue();
+        }
+        catch (IgniteException ex) {
+            if (X.hasCause(ex, ClassNotFoundException.class))
+                return;
+            else
+                throw ex;
+        }
 
-                        if (assigns != null) {
-                            svcName.set(assigns.name());
+        if (assigns != null) {
+            svcName.set(assigns.name());
 
-                            Throwable t = null;
+            Throwable t = null;
 
-                            try {
-                                redeploy(assigns);
-                            }
-                            catch (Error | RuntimeException th) {
-                                t = th;
-                            }
+            try {
+                redeploy(assigns);
+            }
+            catch (Error | RuntimeException th) {
+                t = th;
+            }
 
-                            GridServiceDeploymentFuture fut = depFuts.get(assigns.name());
+            GridServiceDeploymentFuture fut = depFuts.get(assigns.name());
 
-                            if (fut != null && fut.configuration().equalsIgnoreNodeFilter(assigns.configuration())) {
-                                depFuts.remove(assigns.name(), fut);
+            if (fut != null && fut.configuration().equalsIgnoreNodeFilter(assigns.configuration())) {
+                depFuts.remove(assigns.name(), fut);
 
-                                // Complete deployment futures once the assignments have been stored in cache.
-                                fut.onDone(null, t);
-                            }
-                        }
-                        // Handle undeployment.
-                        else {
-                            String name = ((GridServiceAssignmentsKey)e.getKey()).name();
+                // Complete deployment futures once the assignments have been stored in cache.
+                fut.onDone(null, t);
+            }
+        }
+        // Handle undeployment.
+        else {
+            String name = e.getKey().name();
 
-                            svcName.set(name);
+            svcName.set(name);
 
-                            Collection<ServiceContextImpl> ctxs;
+            Collection<ServiceContextImpl> ctxs;
 
-                            synchronized (locSvcs) {
-                                ctxs = locSvcs.remove(name);
-                            }
+            synchronized (locSvcs) {
+                ctxs = locSvcs.remove(name);
+            }
 
-                            if (ctxs != null) {
-                                synchronized (ctxs) {
-                                    cancel(ctxs, ctxs.size());
-                                }
-                            }
-                        }
-                    }
+            if (ctxs != null) {
+                synchronized (ctxs) {
+                    cancel(ctxs, ctxs.size());
                 }
-            });
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7f878c56/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
index 4c44adc..0ff5883 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
@@ -125,6 +125,7 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe
     }
 
     /**
+     * @param stopFromClient If {@code true} stops listener from client node, otherwise from server.
      * @throws Exception If failed.
      */
     private void testMessageListenerReconnect(boolean stopFromClient) throws Exception {
@@ -178,9 +179,11 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe
         assertTrue(locLsnr.latch.await(5000, MILLISECONDS));
         assertTrue(latch.await(5000, MILLISECONDS));
 
-        log.info("Stop listen, should not get remote messages anymore.");
+        Ignite stopFrom = (stopFromClient ? client : srv);
 
-        (stopFromClient ? client : srv).message().stopRemoteListen(opId);
+        log.info("Stop listen, should not get remote messages anymore [from=" + stopFrom.name() + ']');
+
+        stopFrom.message().stopRemoteListen(opId);
 
         srv.message().send(topic, "msg3");
 
@@ -243,6 +246,59 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testCacheContinuousQueryReconnectNewServer() throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        IgniteCache<Object, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<>());
+
+        CacheEventListener lsnr = new CacheEventListener();
+
+        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+        qry.setAutoUnsubscribe(true);
+
+        qry.setLocalListener(lsnr);
+
+        QueryCursor<?> cur = clientCache.query(qry);
+
+        continuousQueryReconnect(client, clientCache, lsnr);
+
+        // Check new server registers listener for reconnected client.
+        try (Ignite newSrv = startGrid(serverCount() + 1)) {
+            awaitPartitionMapExchange();
+
+            lsnr.latch = new CountDownLatch(10);
+
+            IgniteCache<Object, Object> newSrvCache = newSrv.cache(null);
+
+            for (Integer key : primaryKeys(newSrvCache, 10))
+                newSrvCache.put(key, key);
+
+            assertTrue(lsnr.latch.await(5000, MILLISECONDS));
+        }
+
+        cur.close();
+
+        // Check new server does not register listener for closed query.
+        try (Ignite newSrv = startGrid(serverCount() + 1)) {
+            awaitPartitionMapExchange();
+
+            lsnr.latch = new CountDownLatch(5);
+
+            IgniteCache<Object, Object> newSrvCache = newSrv.cache(null);
+
+            for (Integer key : primaryKeys(newSrvCache, 5))
+                newSrvCache.put(key, key);
+
+            assertFalse(lsnr.latch.await(3000, MILLISECONDS));
+        }
+    }
+
+    /**
      * @param client Client.
      * @param clientCache Client cache.
      * @param lsnr Continuous query listener.

http://git-wip-us.apache.org/repos/asf/ignite/blob/7f878c56/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
index 3ae6bb4..3d238af 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
@@ -209,7 +209,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
         for (int i = 0; i < gridCount(); i++) {
             GridContinuousProcessor proc = grid(i).context().continuous();
 
-            assertEquals(String.valueOf(i), 3, ((Map)U.field(proc, "locInfos")).size());
+            assertEquals(String.valueOf(i), 2, ((Map)U.field(proc, "locInfos")).size());
             assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "rmtInfos")).size());
             assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "startFuts")).size());
             assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "stopFuts")).size());

http://git-wip-us.apache.org/repos/asf/ignite/blob/7f878c56/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java
new file mode 100644
index 0000000..05537c0
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Sanity test to verify there are no unnecessary messages on node start.
+ */
+public class IgniteNoCustomEventsOnNodeStart extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean client;
+
+    /** */
+    private static volatile boolean failed;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TestTcpDiscoverySpi discoSpi = new TestTcpDiscoverySpi();
+        discoSpi.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNoCustomEventsOnStart() throws Exception {
+        failed = false;
+
+        for (int i = 0; i < 5; i++) {
+            client = i % 2 == 1;
+
+            startGrid(i);
+        }
+
+        assertFalse(failed);
+    }
+
+    /**
+     *
+     */
+    static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+        /** {@inheritDoc} */
+        @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) {
+            failed = true;
+
+            fail("Should not be called.");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7f878c56/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceClientNodeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceClientNodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceClientNodeTest.java
index c3b9cf4..665294d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceClientNodeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceClientNodeTest.java
@@ -21,6 +21,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -34,47 +35,118 @@ public class GridServiceClientNodeTest extends GridCommonAbstractTest {
     protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
 
     /** */
-    private static final int NODE_CNT = 3;
+    private boolean client;
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
-        ((TcpDiscoverySpi) cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setMaxMissedClientHeartbeats(30);
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setHeartbeatFrequency(1000);
 
-        if (gridName.equals(getTestGridName(NODE_CNT - 1)))
-            cfg.setClientMode(true);
+        cfg.setClientMode(client);
 
         return cfg;
     }
 
     /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        super.beforeTestsStarted();
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
 
-        startGrids(NODE_CNT);
+        super.afterTest();
     }
 
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        stopAllGrids();
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeployFromClient() throws Exception {
+        startGrids(3);
+
+        client = true;
+
+        Ignite ignite = startGrid(3);
+
+        checkDeploy(ignite, "service1");
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testDeployFromClient() throws Exception {
-        Ignite ignite = ignite(NODE_CNT - 1);
+    public void testDeployFromClientAfterRouterStop1() throws Exception {
+        startGrid(0);
 
-        assertTrue(ignite.configuration().isClientMode());
+        client = true;
 
-        String svcName = "testService";
+        Ignite ignite = startGrid(1);
+
+        client = false;
+
+        startGrid(2);
+
+        U.sleep(1000);
+
+        stopGrid(0);
+
+        awaitPartitionMapExchange();
+
+        checkDeploy(ignite, "service1");
+
+        startGrid(3);
+
+        for (int i = 0; i < 10; i++)
+            checkDeploy(ignite, "service2-" + i);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeployFromClientAfterRouterStop2() throws Exception {
+        startGrid(0);
+
+        client = true;
+
+        Ignite ignite = startGrid(1);
+
+        client = false;
+
+        startGrid(2);
+
+        client = true;
+
+        startGrid(3);
+
+        client = false;
+
+        startGrid(4);
+
+        U.sleep(1000);
+
+        stopGrid(0);
+
+        awaitPartitionMapExchange();
+
+        checkDeploy(ignite, "service1");
+
+        startGrid(5);
+
+        for (int i = 0; i < 10; i++)
+            checkDeploy(ignite, "service2-" + i);
+    }
+
+    /**
+     * @param client Client node.
+     * @param svcName Service name.
+     * @throws Exception If failed.
+     */
+    private void checkDeploy(Ignite client, String svcName) throws Exception {
+        assertTrue(client.configuration().isClientMode());
 
         CountDownLatch latch = new CountDownLatch(1);
 
         DummyService.exeLatch(svcName, latch);
 
-        ignite.services().deployClusterSingleton(svcName, new DummyService());
+        client.services().deployClusterSingleton(svcName, new DummyService());
 
         assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7f878c56/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index d0943b5..2632d4c 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -125,6 +125,7 @@ import org.apache.ignite.internal.processors.cache.local.GridCacheLocalMultithre
 import org.apache.ignite.internal.processors.cache.local.GridCacheLocalTxMultiThreadedSelfTest;
 import org.apache.ignite.internal.processors.cache.local.GridCacheLocalTxSingleThreadedSelfTest;
 import org.apache.ignite.internal.processors.cache.local.GridCacheLocalTxTimeoutSelfTest;
+import org.apache.ignite.internal.processors.continuous.IgniteNoCustomEventsOnNodeStart;
 
 /**
  * Test suite.
@@ -253,6 +254,8 @@ public class IgniteCacheTestSuite2 extends TestSuite {
         suite.addTest(new TestSuite(CacheEnumOperationsSingleNodeTest.class));
         suite.addTest(new TestSuite(CacheEnumOperationsTest.class));
 
+        suite.addTest(new TestSuite(IgniteNoCustomEventsOnNodeStart.class));
+
         return suite;
     }
 }


Mime
View raw message