ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-3038
Date Fri, 27 May 2016 09:33:09 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-3038 f70286d47 -> 6e0126122


ignite-3038


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6e012612
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6e012612
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6e012612

Branch: refs/heads/ignite-3038
Commit: 6e0126122d4b626a2eb0e19e17abc441ef4ea4fb
Parents: f70286d
Author: sboikov <sboikov@gridgain.com>
Authored: Fri May 27 11:43:30 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri May 27 12:30:29 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/MarshallerContextImpl.java  |   2 -
 .../binary/CacheObjectBinaryProcessorImpl.java  |  30 ++---
 .../continuous/CacheContinuousQueryManager.java |  13 ++
 .../cacheobject/IgniteCacheObjectProcessor.java |   7 +
 .../IgniteCacheObjectProcessorImpl.java         |   5 +
 .../continuous/GridContinuousProcessor.java     | 129 ++++++++++---------
 .../service/GridServiceProcessor.java           |  25 ++--
 ...eClientReconnectContinuousProcessorTest.java |  60 ++++++++-
 .../IgniteNoCustomEventsOnNodeStart.java        |  80 ++++++++++++
 .../service/GridServiceClientNodeTest.java      |  36 +++++-
 .../testsuites/IgniteCacheTestSuite2.java       |   3 +
 11 files changed, 293 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6e012612/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
index 9a51ed8..cb5b528 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
@@ -260,8 +260,6 @@ public class MarshallerContextImpl extends MarshallerContextAdapter {
                 assert evt.getOldValue() == null || F.eq(evt.getOldValue(), evt.getValue()):
                     "Received cache entry update for system marshaller cache: " + evt;
 
-                log.info("Marshaller cache: " + evt);
-
                 if (evt.getOldValue() == null) {
                     String fileName = evt.getKey() + ".classname";
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/6e012612/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index fe73f8a..1ef09bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -80,6 +80,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.query.CacheQuery;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager;
 import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessorImpl;
 import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -154,9 +155,6 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
     /** Metadata updates collected before metadata cache is initialized. */
     private final Map<Integer, BinaryMetadata> metaBuf = new ConcurrentHashMap<>();
 
-    /** */
-    private UUID metaCacheQryId;
-
     /**
      * @param ctx Kernal context.
      */
@@ -260,6 +258,17 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
     }
 
     /** {@inheritDoc} */
+    @Override public void onContinuousProcessorStarted(GridKernalContext ctx) throws IgniteCheckedException
{
+        if (clientNode && !ctx.isDaemon()) {
+            CacheContinuousQueryManager.registerStaticInternalQuery(ctx,
+                CU.UTILITY_CACHE_NAME,
+                new MetaDataEntryListener(),
+                new MetaDataEntryFilter(),
+                false);
+        }
+    }
+
+    /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public void onUtilityCacheStarted() throws IgniteCheckedException {
         IgniteCacheProxy<Object, Object> proxy = ctx.cache().jcache(CU.UTILITY_CACHE_NAME);
@@ -276,13 +285,6 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
         if (clientNode) {
             assert !metaDataCache.context().affinityNode();
 
-            metaCacheQryId = metaDataCache.context().continuousQueries().executeInternalQuery(
-                new MetaDataEntryListener(),
-                new MetaDataEntryFilter(),
-                false,
-                true,
-                false);
-
             while (true) {
                 ClusterNode oldestSrvNode =
                     CU.oldestAliveCacheServerNode(ctx.cache().context(), AffinityTopologyVersion.NONE);
@@ -363,14 +365,6 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
         }
     }
 
-    /** {@inheritDoc} */
-    @Override public void onKernalStop(boolean cancel) {
-        super.onKernalStop(cancel);
-
-        if (metaCacheQryId != null)
-            metaDataCache.context().continuousQueries().cancelInternalQuery(metaCacheQryId);
-    }
-
     /**
      * @param key Metadata key.
      * @param newMeta Metadata.

http://git-wip-us.apache.org/repos/asf/ignite/blob/6e012612/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 7320d5f..7714984 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -491,6 +491,18 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
             false);
     }
 
+    /**
+     * Registers routine info to be sent in discovery data during this node join
+     * (to be used for internal queries started from client nodes).
+     *
+     * @param ctx Context.
+     * @param cacheName Cache name.
+     * @param locLsnr Local listener.
+     * @param rmtFilter Remote filter.
+     * @param ignoreClsNotFound Ignore class not found error flag.
+     * @return Continuous routine ID.
+     * @throws IgniteCheckedException If failed.
+     */
     public static UUID registerStaticInternalQuery(
         GridKernalContext ctx,
         String cacheName,
@@ -1245,6 +1257,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
          */
         public CacheEntryEventImpl(Cache src, EventType evtType, Object key, Object val)
{
             super(src, evtType);
+
             this.key = key;
             this.val = val;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6e012612/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
index cadf1a9..99de507 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
@@ -22,6 +22,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.GridComponent;
+import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.GridProcessor;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
@@ -34,6 +35,12 @@ import org.jetbrains.annotations.Nullable;
  */
 public interface IgniteCacheObjectProcessor extends GridProcessor {
     /**
+     * @param ctx Context.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void onContinuousProcessorStarted(GridKernalContext ctx) throws IgniteCheckedException;
+
+    /**
      * @see GridComponent#onKernalStart()
      * @throws IgniteCheckedException If failed.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/6e012612/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
index 75d65de..6630c7a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
@@ -235,6 +235,11 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter
impleme
     }
 
     /** {@inheritDoc} */
+    @Override public void onContinuousProcessorStarted(GridKernalContext ctx) throws IgniteCheckedException
{
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
     @Override public void onUtilityCacheStarted() throws IgniteCheckedException {
         // No-op.
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6e012612/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 0ac04e2..3357d7d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -64,7 +64,6 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
@@ -251,17 +250,13 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 @Override public void onCustomEvent(AffinityTopologyVersion topVer,
                     ClusterNode snd,
                     StopRoutineDiscoveryMessage msg) {
-                    if (!snd.id().equals(ctx.localNodeId())) {
-                        UUID routineId = msg.routineId();
+                    UUID routineId = msg.routineId();
 
-                        unregisterRemote(routineId);
-
-                        if (snd.isClient()) {
-                            Map<UUID, LocalRoutineInfo> clientRoutineMap = clientInfos.get(snd.id());
+                    unregisterRemote(routineId);
 
-                            if (clientRoutineMap != null)
-                                clientRoutineMap.remove(msg.routineId());
-                        }
+                    for (Map<UUID, LocalRoutineInfo> clientInfo : clientInfos.values())
{
+                        if (clientInfo.remove(msg.routineId()) != null)
+                            break;
                     }
                 }
             });
@@ -312,6 +307,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
         ctx.marshallerContext().onContinuousProcessorStarted(ctx);
 
+        ctx.cacheObjects().onContinuousProcessorStarted(ctx);
+
         ctx.service().onContinuousProcessorStarted(ctx);
 
         if (log.isDebugEnabled())
@@ -397,11 +394,13 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Override @Nullable public Serializable collectDiscoveryData(UUID nodeId) {
-        log.info("collectDiscoveryData [node=" + nodeId +
-            ", loc=" + ctx.localNodeId() +
-            ", locInfos=" + locInfos +
-            ", clientInfos=" + clientInfos +
-            ']');
+        if (log.isDebugEnabled()) {
+            log.debug("collectDiscoveryData [node=" + nodeId +
+                ", loc=" + ctx.localNodeId() +
+                ", locInfos=" + locInfos +
+                ", clientInfos=" + clientInfos +
+                ']');
+        }
 
         if (!nodeId.equals(ctx.localNodeId()) || !locInfos.isEmpty()) {
             Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos0 = U.newHashMap(clientInfos.size());
@@ -451,11 +450,13 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable
obj) {
         DiscoveryData data = (DiscoveryData)obj;
 
-        log.info("onDiscoveryDataReceived [joining=" + joiningNodeId +
-            ", rmtNodeId=" + rmtNodeId +
-            ", loc=" + ctx.localNodeId() +
-            ", data=" + data +
-            ']');
+        if (log.isDebugEnabled()) {
+            log.info("onDiscoveryDataReceived [joining=" + joiningNodeId +
+                ", rmtNodeId=" + rmtNodeId +
+                ", loc=" + ctx.localNodeId() +
+                ", data=" + data +
+                ']');
+        }
 
         if (!ctx.isDaemon() && data != null) {
             for (DiscoveryDataItem item : data.items) {
@@ -484,29 +485,31 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> entry : data.clientInfos.entrySet())
{
                 UUID clientNodeId = entry.getKey();
 
-                Map<UUID, LocalRoutineInfo> clientRoutineMap = entry.getValue();
+                if (!ctx.localNodeId().equals(clientNodeId)) {
+                    Map<UUID, LocalRoutineInfo> clientRoutineMap = entry.getValue();
 
-                for (Map.Entry<UUID, LocalRoutineInfo> e : clientRoutineMap.entrySet())
{
-                    UUID routineId = e.getKey();
-                    LocalRoutineInfo info = e.getValue();
+                    for (Map.Entry<UUID, LocalRoutineInfo> e : clientRoutineMap.entrySet())
{
+                        UUID routineId = e.getKey();
+                        LocalRoutineInfo info = e.getValue();
 
-                    try {
-                        if (info.prjPred != null)
-                            ctx.resource().injectGeneric(info.prjPred);
-
-                        if (info.prjPred == null || info.prjPred.apply(ctx.discovery().localNode()))
{
-                            if (registerHandler(clientNodeId,
-                                routineId,
-                                info.hnd,
-                                info.bufSize,
-                                info.interval,
-                                info.autoUnsubscribe,
-                                false))
-                                info.hnd.onListenerRegistered(routineId, ctx);
+                        try {
+                            if (info.prjPred != null)
+                                ctx.resource().injectGeneric(info.prjPred);
+
+                            if (info.prjPred == null || info.prjPred.apply(ctx.discovery().localNode()))
{
+                                if (registerHandler(clientNodeId,
+                                    routineId,
+                                    info.hnd,
+                                    info.bufSize,
+                                    info.interval,
+                                    info.autoUnsubscribe,
+                                    false))
+                                    info.hnd.onListenerRegistered(routineId, ctx);
+                            }
+                        }
+                        catch (IgniteCheckedException err) {
+                            U.error(log, "Failed to register continuous handler.", err);
                         }
-                    }
-                    catch (IgniteCheckedException err) {
-                        U.error(log, "Failed to register continuous handler.", err);
                     }
                 }
 
@@ -564,6 +567,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Registers routine info to be sent in discovery data during this node join
+     * (to be used for internal queries started from client nodes).
+     *
      * @param hnd Handler.
      * @param bufSize Buffer size.
      * @param interval Time interval.
@@ -583,8 +589,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
         locInfos.put(routineId, routineInfo);
 
-        registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, interval, autoUnsubscribe,
true);
-
         registerMessageListener(hnd);
 
         return routineId;
@@ -746,21 +750,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         }
 
         if (doStop) {
-            // Unregister routine locally.
-            LocalRoutineInfo routine = locInfos.remove(routineId);
-
-            // Finish if routine is not found (wrong ID is provided).
-            if (routine == null) {
-                stopFuts.remove(routineId);
-
-                fut.onDone();
-
-                return fut;
-            }
-
-            // Unregister handler locally.
-            unregisterHandler(routineId, routine.hnd, true);
-
             try {
                 ctx.discovery().sendCustomEvent(new StopRoutineDiscoveryMessage(routineId));
             }
@@ -868,16 +857,28 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException
{
         cancelFutures(new IgniteClientDisconnectedCheckedException(reconnectFut, "Client
node disconnected."));
 
-        log.info("onDisconnected [rmtInfos=" + rmtInfos + ", clientInfos=" + clientInfos
+ ']');
+        if (log.isDebugEnabled()) {
+            log.debug("onDisconnected [rmtInfos=" + rmtInfos +
+                ", locInfos=" + locInfos +
+                ", clientInfos=" + clientInfos + ']');
+        }
+
+        for (Map.Entry<UUID, RemoteRoutineInfo> e : rmtInfos.entrySet()) {
+            RemoteRoutineInfo info = e.getValue();
 
-        for (UUID rmtId : rmtInfos.keySet())
-            unregisterRemote(rmtId);
+            if (!ctx.localNodeId().equals(info.nodeId) || info.autoUnsubscribe)
+                unregisterRemote(e.getKey());
+        }
 
         rmtInfos.clear();
 
         clientInfos.clear();
 
-        log.info("after onDisconnected [rmtInfos=" + rmtInfos + ", locInfos=" + locInfos
+ ']');
+        if (log.isDebugEnabled()) {
+            log.debug("after onDisconnected [rmtInfos=" + rmtInfos +
+                ", locInfos=" + locInfos +
+                ", clientInfos=" + clientInfos + ']');
+        }
     }
 
     /**
@@ -1103,6 +1104,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         }
 
         if (doRegister) {
+            if (log.isDebugEnabled())
+                log.debug("Register handler: [nodeId=" + nodeId + ", routineId=" + routineId
+ ", info=" + info + ']');
+
             if (interval > 0) {
                 IgniteThread checker = new IgniteThread(new GridWorker(ctx.gridName(), "continuous-buffer-checker",
log) {
                     @SuppressWarnings("ConstantConditions")
@@ -1217,6 +1221,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             stopLock.unlock();
         }
 
+        if (log.isDebugEnabled())
+            log.debug("unregisterRemote [routineId=" + routineId + ", loc=" + loc + ", rmt="
+ remote + ']');
+
         if (remote != null)
             unregisterHandler(routineId, remote.hnd, false);
         else if (loc != null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/6e012612/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 15a0685..7734d79 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -57,8 +57,6 @@ import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
 import org.apache.ignite.internal.processors.cache.CacheIteratorConverter;
-import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.query.CacheQuery;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
@@ -210,17 +208,18 @@ public class GridServiceProcessor extends GridProcessorAdapter {
             if (ctx.deploy().enabled())
                 ctx.cache().context().deploy().ignoreOwnership(true);
 
-            boolean affNode = cache.context().affinityNode();
+            if (!ctx.clientNode()) {
+                assert cache.context().affinityNode();
 
-            // For clients continuous query is statically registered.
-            if (affNode) {
                 cache.context().continuousQueries().executeInternalQuery(new ServiceEntriesListener(),
                     null,
-                    affNode,
                     true,
-                    !affNode);
+                    true,
+                    false);
             }
             else {
+                assert !ctx.isDaemon();
+
                 ctx.closure().runLocalSafe(new Runnable() {
                     @Override public void run() {
                         try {
@@ -234,8 +233,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
                         }
                     }
                 });
-            }        assert !ctx.isDaemon();
-
+            }
         }
         finally {
             if (ctx.deploy().enabled())
@@ -1322,12 +1320,13 @@ public class GridServiceProcessor extends GridProcessorAdapter {
         }
     }
 
-    private void onSystemCacheUpdated(final Iterable<CacheEntryEvent<?, ?>> deps)
{
+    /**
+     * @param evts Update events.
+     */
+    private void onSystemCacheUpdated(final Iterable<CacheEntryEvent<?, ?>> evts)
{
         boolean firstTime = true;
 
-        for (CacheEntryEvent<?, ?> e : deps) {
-            log.info("Service listener: " + e);
-
+        for (CacheEntryEvent<?, ?> e : evts) {
             if (e.getKey() instanceof GridServiceDeploymentKey) {
                 if (firstTime) {
                     markCompatibilityStateAsUsed();

http://git-wip-us.apache.org/repos/asf/ignite/blob/6e012612/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
index 4c44adc..0ff5883 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
@@ -125,6 +125,7 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe
     }
 
     /**
+     * @param stopFromClient If {@code true} stops listener from client node, otherwise from
server.
      * @throws Exception If failed.
      */
     private void testMessageListenerReconnect(boolean stopFromClient) throws Exception {
@@ -178,9 +179,11 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe
         assertTrue(locLsnr.latch.await(5000, MILLISECONDS));
         assertTrue(latch.await(5000, MILLISECONDS));
 
-        log.info("Stop listen, should not get remote messages anymore.");
+        Ignite stopFrom = (stopFromClient ? client : srv);
 
-        (stopFromClient ? client : srv).message().stopRemoteListen(opId);
+        log.info("Stop listen, should not get remote messages anymore [from=" + stopFrom.name()
+ ']');
+
+        stopFrom.message().stopRemoteListen(opId);
 
         srv.message().send(topic, "msg3");
 
@@ -243,6 +246,59 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testCacheContinuousQueryReconnectNewServer() throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        IgniteCache<Object, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<>());
+
+        CacheEventListener lsnr = new CacheEventListener();
+
+        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+        qry.setAutoUnsubscribe(true);
+
+        qry.setLocalListener(lsnr);
+
+        QueryCursor<?> cur = clientCache.query(qry);
+
+        continuousQueryReconnect(client, clientCache, lsnr);
+
+        // Check new server registers listener for reconnected client.
+        try (Ignite newSrv = startGrid(serverCount() + 1)) {
+            awaitPartitionMapExchange();
+
+            lsnr.latch = new CountDownLatch(10);
+
+            IgniteCache<Object, Object> newSrvCache = newSrv.cache(null);
+
+            for (Integer key : primaryKeys(newSrvCache, 10))
+                newSrvCache.put(key, key);
+
+            assertTrue(lsnr.latch.await(5000, MILLISECONDS));
+        }
+
+        cur.close();
+
+        // Check new server does not register listener for closed query.
+        try (Ignite newSrv = startGrid(serverCount() + 1)) {
+            awaitPartitionMapExchange();
+
+            lsnr.latch = new CountDownLatch(5);
+
+            IgniteCache<Object, Object> newSrvCache = newSrv.cache(null);
+
+            for (Integer key : primaryKeys(newSrvCache, 5))
+                newSrvCache.put(key, key);
+
+            assertFalse(lsnr.latch.await(3000, MILLISECONDS));
+        }
+    }
+
+    /**
      * @param client Client.
      * @param clientCache Client cache.
      * @param lsnr Continuous query listener.

http://git-wip-us.apache.org/repos/asf/ignite/blob/6e012612/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java
new file mode 100644
index 0000000..c268015
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class IgniteNoCustomEventsOnNodeStart extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean client;
+
+    /** */
+    private static volatile boolean failed;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TestTcpDiscoverySpi discoSpi = new TestTcpDiscoverySpi();
+        discoSpi.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNoCustomEventsOnStart() throws Exception {
+        failed = false;
+
+        for (int i = 0; i < 5; i++) {
+            client = i % 2 == 1;
+
+            startGrid(i);
+        }
+
+        assertFalse(failed);
+    }
+
+    /**
+     *
+     */
+    static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+        /** {@inheritDoc} */
+        @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) {
+            failed = true;
+
+            fail("Should not be called.");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6e012612/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceClientNodeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceClientNodeTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceClientNodeTest.java
index 6d869ae..8f92195 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceClientNodeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceClientNodeTest.java
@@ -73,7 +73,7 @@ public class GridServiceClientNodeTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testDeployAfterServerStop() throws Exception {
+    public void testDeployFromClientAfterRouterStop1() throws Exception {
         startGrid(0);
 
         client = true;
@@ -97,6 +97,40 @@ public class GridServiceClientNodeTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testDeployFromClientAfterRouterStop2() throws Exception {
+        startGrid(0);
+
+        client = true;
+
+        Ignite ignite = startGrid(1);
+
+        client = false;
+
+        startGrid(2);
+
+        client = true;
+
+        startGrid(3);
+
+        client = false;
+
+        startGrid(4);
+
+        U.sleep(1000);
+
+        stopGrid(0);
+
+        checkDeploy(ignite, "service1");
+
+        startGrid(5);
+
+        for (int i = 0; i < 10; i++)
+            checkDeploy(ignite, "service2-" + i);
+    }
+
+    /**
      * @param client Client node.
      * @param svcName Service name.
      * @throws Exception If failed.

http://git-wip-us.apache.org/repos/asf/ignite/blob/6e012612/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index d0943b5..2632d4c 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -125,6 +125,7 @@ import org.apache.ignite.internal.processors.cache.local.GridCacheLocalMultithre
 import org.apache.ignite.internal.processors.cache.local.GridCacheLocalTxMultiThreadedSelfTest;
 import org.apache.ignite.internal.processors.cache.local.GridCacheLocalTxSingleThreadedSelfTest;
 import org.apache.ignite.internal.processors.cache.local.GridCacheLocalTxTimeoutSelfTest;
+import org.apache.ignite.internal.processors.continuous.IgniteNoCustomEventsOnNodeStart;
 
 /**
  * Test suite.
@@ -253,6 +254,8 @@ public class IgniteCacheTestSuite2 extends TestSuite {
         suite.addTest(new TestSuite(CacheEnumOperationsSingleNodeTest.class));
         suite.addTest(new TestSuite(CacheEnumOperationsTest.class));
 
+        suite.addTest(new TestSuite(IgniteNoCustomEventsOnNodeStart.class));
+
         return suite;
     }
 }


Mime
View raw message