ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [1/2] ignite git commit: ignite-5075
Date Fri, 26 May 2017 13:27:59 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-5075 ef1105d12 -> 3858bb58f


ignite-5075


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5ea8ae04
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5ea8ae04
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5ea8ae04

Branch: refs/heads/ignite-5075
Commit: 5ea8ae045282e7c07b6589612cf1a73f01329035
Parents: 9585dd3
Author: sboikov <sboikov@gridgain.com>
Authored: Fri May 26 15:28:33 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri May 26 16:27:11 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       |   2 +-
 .../cache/CacheGroupInfrastructure.java         | 123 +++++++++++++++++--
 .../processors/cache/ClusterCachesInfo.java     |   6 +-
 .../processors/cache/GridCacheContext.java      |   8 --
 .../processors/cache/GridCacheProcessor.java    |   7 +-
 .../cache/database/CacheDataRowAdapter.java     |   8 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |   1 -
 .../continuous/CacheContinuousQueryManager.java |  31 +++--
 .../processors/cache/IgniteCacheGroupsTest.java |  98 ++++++++++++++-
 9 files changed, 245 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5ea8ae04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index c3311a8..d3be472 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -379,7 +379,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             }
             else {
                 startCache = cctx.cacheContext(cacheDesc.cacheId()) == null &&
-                    CU.affinityNode(cctx.localNode(), req.startCacheConfiguration().getNodeFilter());
+                    CU.affinityNode(cctx.localNode(), cacheDesc.groupDescriptor().config().getNodeFilter());
             }
 
             if (startCache) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ea8ae04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
index aed96e4..11efd77 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
@@ -103,6 +104,9 @@ public class CacheGroupInfrastructure {
     private volatile List<GridCacheContext> caches;
 
     /** */
+    private volatile List<GridCacheContext> contQryCaches;
+
+    /** */
     private final IgniteLogger log;
 
     /** */
@@ -129,6 +133,12 @@ public class CacheGroupInfrastructure {
     /** ReuseList instance this group is associated with */
     private final ReuseList reuseList;
 
+    /** */
+    private boolean drEnabled;
+
+    /** */
+    private boolean qryEnabled;
+
     /**
      * @param grpId Group ID.
      * @param ctx Context.
@@ -263,8 +273,14 @@ public class CacheGroupInfrastructure {
 
         assert add : cctx.name();
 
+        if (!qryEnabled && QueryUtils.isEnabled(cctx.config()))
+            qryEnabled = true;
+
+        if (!drEnabled && cctx.isDrEnabled())
+            drEnabled = true;
+
         this.caches = caches;
-    }
+   }
 
     /**
      * @param cctx Cache context.
@@ -280,9 +296,39 @@ public class CacheGroupInfrastructure {
                 assert sharedGroup() || caches.size() == 1 : caches.size();
 
                 it.remove();
+
+                break;
             }
         }
 
+        if (QueryUtils.isEnabled(cctx.config())) {
+            boolean qryEnabled = false;
+
+            for (int i = 0; i < caches.size(); i++) {
+                if (QueryUtils.isEnabled(caches.get(i).config())) {
+                    qryEnabled = true;
+
+                    break;
+                }
+            }
+
+            this.qryEnabled = qryEnabled;
+        }
+
+        if (cctx.isDrEnabled()) {
+            boolean drEnabled = false;
+
+            for (int i = 0; i < caches.size(); i++) {
+                if (caches.get(i).isDrEnabled()) {
+                    drEnabled = true;
+
+                    break;
+                }
+            }
+
+            this.drEnabled = drEnabled;
+        }
+
         this.caches = caches;
     }
 
@@ -423,19 +469,25 @@ public class CacheGroupInfrastructure {
         }
     }
 
-    // TODO IGNITE-5075
+    /**
+     * @return {@code True} if contains cache with query indexing enabled.
+     */
     public boolean queriesEnabled() {
-        return QueryUtils.isEnabled(ccfg);
+        return qryEnabled;
     }
 
-    // TODO IGNITE-5075 see GridCacheContext#allowFastEviction
+    /**
+     * @return {@code True} if fast eviction is allowed.
+     */
     public boolean allowFastEviction() {
-        return false;
+        return ctx.database().persistenceEnabled() && !queriesEnabled();
     }
 
-    // TODO IGNITE-5075.
+    /**
+     * @return {@code True} in case replication is enabled.
+     */
     public boolean isDrEnabled() {
-        return false;
+        return drEnabled;
     }
 
     /**
@@ -533,6 +585,13 @@ public class CacheGroupInfrastructure {
     }
 
     /**
+     * @return Cache node filter.
+     */
+    public IgnitePredicate<ClusterNode> nodeFilter() {
+        return ccfg.getNodeFilter();
+    }
+
+    /**
      * @return Configured user objects which should be initialized/stopped on group start/stop.
      */
     Collection<?> configuredUserObjects() {
@@ -668,6 +727,47 @@ public class CacheGroupInfrastructure {
     }
 
     /**
+     * @param cctx Cache context.
+     */
+    public void addCacheWithContinuousQuery(GridCacheContext cctx) {
+        assert sharedGroup() : cacheOrGroupName();
+        assert cctx.group() == this : cctx.name();
+
+        synchronized (this) {
+            List<GridCacheContext> contQryCaches = this.contQryCaches;
+
+            if (contQryCaches == null)
+                contQryCaches = new ArrayList<>();
+
+            contQryCaches.add(cctx);
+
+            this.contQryCaches = contQryCaches;
+        }
+    }
+
+    /**
+     * @param cctx Cache context.
+     */
+    public void removeCacheWithContinuousQuery(GridCacheContext cctx) {
+        assert sharedGroup() : cacheOrGroupName();
+        assert cctx.group() == this : cctx.name();
+
+        synchronized (this) {
+            List<GridCacheContext> contQryCaches = this.contQryCaches;
+
+            if (contQryCaches == null)
+                return;
+
+            contQryCaches.remove(cctx);
+
+            if (contQryCaches.isEmpty())
+                contQryCaches = null;
+
+            this.contQryCaches = contQryCaches;
+        }
+    }
+
+    /**
      * @param cacheId ID of cache initiated counter update.
      * @param part Partition number.
      * @param cntr Counter.
@@ -682,12 +782,15 @@ public class CacheGroupInfrastructure {
         if (isLocal())
             return;
 
-        List<GridCacheContext> caches = this.caches;
+        List<GridCacheContext> contQryCaches = this.contQryCaches;
+
+        if (contQryCaches == null)
+            return;
 
         CounterSkipContext skipCtx = null;
 
-        for (int i = 0; i < caches.size(); i++) {
-            GridCacheContext cctx = caches.get(i);
+        for (int i = 0; i < contQryCaches.size(); i++) {
+            GridCacheContext cctx = contQryCaches.get(i);
 
             if (cacheId != cctx.cacheId())
                 skipCtx = cctx.continuousQueries().skipUpdateCounter(skipCtx, part, cntr,
topVer);

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ea8ae04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 370f07b..25f70da 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -820,7 +820,9 @@ class ClusterCachesInfo {
                     desc = desc0;
                 }
 
-                if (locCfg != null || joinDiscoData.startCaches() || CU.affinityNode(ctx.discovery().localNode(),
cfg.getNodeFilter()))
+                if (locCfg != null ||
+                    joinDiscoData.startCaches() ||
+                    CU.affinityNode(ctx.discovery().localNode(), desc.groupDescriptor().config().getNodeFilter()))
                     locJoinStartCaches.add(new T2<>(desc, nearCfg));
             }
         }
@@ -1015,7 +1017,7 @@ class ClusterCachesInfo {
 
         assert old == null : old;
 
-        ctx.discovery().addCacheGroup(grpDesc, startedCacheCfg.getNodeFilter(), startedCacheCfg.getCacheMode());
+        ctx.discovery().addCacheGroup(grpDesc, grpDesc.config().getNodeFilter(), startedCacheCfg.getCacheMode());
 
         if (exchActions != null)
             exchActions.addCacheGroupToStart(grpDesc);

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ea8ae04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 3341d4c..aaa67ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -83,7 +83,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEnt
 import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
 import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
 import org.apache.ignite.internal.processors.plugin.CachePluginManager;
-import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.F0;
 import org.apache.ignite.internal.util.lang.GridFunc;
@@ -2016,13 +2015,6 @@ public class GridCacheContext<K, V> implements Externalizable
{
     }
 
     /**
-     * @return {@code True} if fast eviction is allowed.
-     */
-    public boolean allowFastEviction() {
-        return shared().database().persistenceEnabled() && !QueryUtils.isEnabled(cacheCfg);
-    }
-
-    /**
      * @param part Partition.
      * @param affNodes Affinity nodes.
      * @param topVer Topology version.

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ea8ae04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index e28edfb..3a380a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -836,12 +836,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
                     for (DynamicCacheDescriptor desc : cacheDescriptors().values()) {
                         CacheConfiguration c = desc.cacheConfiguration();
-                        IgnitePredicate filter = c.getNodeFilter();
+                        IgnitePredicate filter = desc.groupDescriptor().config().getNodeFilter();
 
                         if (c.getName().equals(conf.getName()) &&
                             ((desc.receivedOnDiscovery() && CU.affinityNode(locNode,
filter)) ||
                                 CU.isSystemCache(c.getName()))) {
-
                             tmpCacheCfg.add(c);
 
                             break;
@@ -1820,7 +1819,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         if (started != null) {
             for (DynamicCacheDescriptor desc : started) {
-                IgnitePredicate<ClusterNode> filter = desc.cacheConfiguration().getNodeFilter();
+                IgnitePredicate<ClusterNode> filter = desc.groupDescriptor().config().getNodeFilter();
 
                 if (CU.affinityNode(ctx.discovery().localNode(), filter)) {
                     prepareCacheStart(
@@ -1868,7 +1867,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             ccfg.setNearConfiguration(null);
         }
-        else if (CU.affinityNode(ctx.discovery().localNode(), ccfg.getNodeFilter()))
+        else if (CU.affinityNode(ctx.discovery().localNode(), grpDesc.config().getNodeFilter()))
             affNode = true;
         else {
             affNode = false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ea8ae04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
index 75909d4..7255a5d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
@@ -214,9 +214,13 @@ public class CacheDataRowAdapter implements CacheDataRow {
             incomplete = null;
         }
 
-        if (coctx == null)
-            // TODO IGNITE-5075 Possible null pointer in case cacheId is not stored
+        if (coctx == null) {
+            // coctx can be null only when grp is null too, this means that
+            // we are in process of eviction and cacheId is mandatory part of data.
+            assert cacheId != 0;
+
             coctx = sharedCtx.cacheContext(cacheId).cacheObjectContext();
+        }
 
         // Read key.
         if (key == null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ea8ae04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 1a36e4d..4a49569 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -740,7 +740,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
 
             if (ctx.pageStore() != null) {
                 try {
-                    // TODO IGNITE-5075.
                     ctx.pageStore().onPartitionCreated(grp.groupId(), p);
                 }
                 catch (IgniteCheckedException e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ea8ae04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index fc39b6d..e1a5179 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -644,7 +644,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
         hnd.localCache(cctx.isLocal());
 
         IgnitePredicate<ClusterNode> pred = (loc || cctx.config().getCacheMode() ==
CacheMode.LOCAL) ?
-            F.nodeForNodeId(cctx.localNodeId()) : cctx.config().getNodeFilter();
+            F.nodeForNodeId(cctx.localNodeId()) : cctx.group().nodeFilter();
 
         assert pred != null : cctx.config();
 
@@ -820,16 +820,23 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
                 intLsnrCnt.incrementAndGet();
         }
         else {
-            added = lsnrs.putIfAbsent(lsnrId, lsnr) == null;
+            synchronized (this) {
+                added = lsnrs.putIfAbsent(lsnrId, lsnr) == null;
 
-            if (added) {
-                lsnrCnt.incrementAndGet();
+                if (added) {
+                    int cnt = lsnrCnt.incrementAndGet();
 
-                lsnr.onExecution();
+                    if (cctx.group().sharedGroup() && cnt == 1)
+                        cctx.group().addCacheWithContinuousQuery(cctx);
+                }
             }
+
+            if (added)
+                lsnr.onExecution();
         }
 
-        return added ? GridContinuousHandler.RegisterStatus.REGISTERED : GridContinuousHandler.RegisterStatus.NOT_REGISTERED;
+        return added ? GridContinuousHandler.RegisterStatus.REGISTERED :
+            GridContinuousHandler.RegisterStatus.NOT_REGISTERED;
     }
 
     /**
@@ -847,11 +854,17 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
             }
         }
         else {
-            if ((lsnr = lsnrs.remove(id)) != null) {
-                lsnrCnt.decrementAndGet();
+            synchronized (this) {
+                if ((lsnr = lsnrs.remove(id)) != null) {
+                    int cnt = lsnrCnt.decrementAndGet();
 
-                lsnr.onUnregister();
+                    if (cctx.group().sharedGroup() && cnt == 0)
+                        cctx.group().removeCacheWithContinuousQuery(cctx);
+                }
             }
+
+            if (lsnr != null)
+                lsnr.onUnregister();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ea8ae04/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
index 4529f77..1cc8999 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
@@ -35,11 +35,12 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import javax.cache.Cache;
 import javax.cache.CacheException;
-import javax.cache.processor.EntryProcessorException;
-import javax.cache.processor.MutableEntry;
 import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryUpdatedListener;
 import javax.cache.integration.CacheLoaderException;
 import javax.cache.integration.CacheWriterException;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
@@ -56,6 +57,7 @@ import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.affinity.AffinityKeyMapper;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cache.query.ScanQuery;
 import org.apache.ignite.cache.store.CacheStore;
 import org.apache.ignite.cache.store.CacheStoreAdapter;
@@ -2419,6 +2421,98 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testContinuousQueriesMultipleGroups1() throws Exception {
+        continuousQueriesMultipleGroups(1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testContinuousQueriesMultipleGroups2() throws Exception {
+        continuousQueriesMultipleGroups(4);
+    }
+
+    /**
+     * @param srvs Number of server nodes.
+     * @throws Exception If failed.
+     */
+    private void continuousQueriesMultipleGroups(int srvs) throws Exception {
+        Ignite srv0 = startGrids(srvs);
+
+        client = true;
+
+        Ignite client = startGrid(srvs);
+
+        client.createCache(cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 1, false));
+        client.createCache(cacheConfiguration(GROUP1, "c2", PARTITIONED, TRANSACTIONAL, 1,
false));
+        client.createCache(cacheConfiguration(GROUP1, "c3", PARTITIONED, ATOMIC, 1, false));
+
+        client.createCache(cacheConfiguration(GROUP2, "c4", PARTITIONED, TRANSACTIONAL, 1,
false));
+        client.createCache(cacheConfiguration(GROUP2, "c5", PARTITIONED, ATOMIC, 1, false));
+        client.createCache(cacheConfiguration(GROUP2, "c6", PARTITIONED, TRANSACTIONAL, 1,
false));
+
+        client.createCache(cacheConfiguration(null, "c7", PARTITIONED, ATOMIC, 1, false));
+        client.createCache(cacheConfiguration(null, "c8", PARTITIONED, TRANSACTIONAL, 1,
false));
+
+        String[] cacheNames = {"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8"};
+
+        AtomicInteger c1 = registerListener(client, "c1");
+
+        for (String cache : cacheNames)
+            srv0.cache(cache).put(1, 1);
+
+        waitForEvents(c1, 1);
+
+        for (String cache : cacheNames)
+            srv0.cache(cache).put(1, 1);
+
+        waitForEvents(c1, 1);
+    }
+
+    /**
+     * @param cntr Counter.
+     * @param expEvts Expected events number.
+     * @throws Exception If failed.
+     */
+    private void waitForEvents(final AtomicInteger cntr, final int expEvts) throws Exception
{
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                if (cntr.get() < expEvts)
+                    log.info("Wait for events [rcvd=" + cntr.get() + ", exp=" + expEvts +
']');
+
+                return false;
+            }
+        }, 5000);
+
+        assertEquals(expEvts, cntr.get());
+        assertTrue(cntr.compareAndSet(expEvts, 0));
+    }
+
+    /**
+     * @param node Node.
+     * @param cacheName Cache name.
+     * @return Received events counter.
+     */
+    private AtomicInteger registerListener(Ignite node, String cacheName) {
+        ContinuousQuery qry = new ContinuousQuery();
+
+        final AtomicInteger cntr = new AtomicInteger();
+
+        qry.setLocalListener(new CacheEntryUpdatedListener() {
+            @Override public void onUpdated(Iterable iterable) {
+                for (Object evt : iterable)
+                    cntr.incrementAndGet();
+            }
+        });
+
+        node.cache(cacheName).query(qry);
+
+        return cntr;
+    }
+
+    /**
      *
      */
     static class Mapper1 implements AffinityKeyMapper {


Mime
View raw message