ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [12/50] [abbrv] ignite git commit: Fix missed event notification in remove-remove case.
Date Sun, 03 Apr 2016 19:42:36 GMT
Fix missed event notification in remove-remove case.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/35fd10d3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/35fd10d3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/35fd10d3

Branch: refs/heads/ignite-2407
Commit: 35fd10d32a943c0cf5c9f0ddf901a15ef143a896
Parents: 9cd2f09
Author: nikolay_tikhonov <ntikhonov@gridgain.com>
Authored: Mon Mar 28 16:31:39 2016 +0300
Committer: nikolay_tikhonov <ntikhonov@gridgain.com>
Committed: Mon Mar 28 16:31:39 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     |   2 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  17 +-
 .../CacheContinuousQueryFactoryFilterTest.java  |   6 +-
 ...acheContinuousQueryRandomOperationsTest.java | 522 +++++++++++++++++--
 ...inuousQueryRandomOperationsTwoNodesTest.java |  28 +
 .../IgniteCacheQuerySelfTestSuite3.java         |   2 +
 6 files changed, 530 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/35fd10d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index fb6aeef..c5df29b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -2092,7 +2092,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
                             null,
                             null,
                             false,
-                            updateCntr0 == null ? 0 : updateCntr0);
+                            0);
                     }
                 }
                 else

http://git-wip-us.apache.org/repos/asf/ignite/blob/35fd10d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index e908c05..e8a2200 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -2115,7 +2115,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                                 "[entry=" + entry + ", filter=" + Arrays.toString(req.filter())
+ ']');
                     }
                 }
-                else if (lsnrs != null && updRes.success()) {
+                else if (lsnrs != null && updRes.updateCounter() != 0) {
                     ctx.continuousQueries().onEntryUpdated(
                         lsnrs,
                         entry.key(),
@@ -2427,6 +2427,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                                 updRes.newTtl(),
                                 CU.EXPIRE_TIME_CALCULATE);
                     }
+                    else if (lsnrs != null && updRes.updateCounter() != 0) {
+                        ctx.continuousQueries().onEntryUpdated(
+                            lsnrs,
+                            entry.key(),
+                            updRes.newValue(),
+                            updRes.oldValue(),
+                            entry.isInternal() || !context().userCache(),
+                            entry.partition(),
+                            primary,
+                            false,
+                            updRes.updateCounter(),
+                            topVer);
+                    }
 
                     if (hasNear) {
                         if (primary) {
@@ -2868,7 +2881,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                         if (updRes.removeVersion() != null)
                             ctx.onDeferredDelete(entry, updRes.removeVersion());
 
-                        if (lsnrs != null && updRes.success()) {
+                        if (lsnrs != null && updRes.updateCounter() != 0) {
                             ctx.continuousQueries().onEntryUpdated(
                                 lsnrs,
                                 entry.key(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/35fd10d3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java
index 6143fa9..55340d5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java
@@ -95,7 +95,7 @@ public class CacheContinuousQueryFactoryFilterTest extends CacheContinuousQueryR
             ONHEAP_TIERED,
             false);
 
-        final IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(ccfg);
+        final IgniteCache<Object, Object> cache = grid(0).createCache(ccfg);
 
         UUID uuid = null;
 
@@ -128,12 +128,12 @@ public class CacheContinuousQueryFactoryFilterTest extends CacheContinuousQueryR
                 grid(0).context().cache().cache(cache.getName()).context().continuousQueries()
                     .cancelInternalQuery(uuid);
 
-            cache.destroy();
+            grid(0).destroyCache(ccfg.getName());
         }
     }
 
     /** {@inheritDoc} */
-    @Override protected void testContinuousQuery(CacheConfiguration<Object, Object>
ccfg, ContinuousDeploy deploy)
+    @Override protected void doTestContinuousQuery(CacheConfiguration<Object, Object>
ccfg, ContinuousDeploy deploy)
         throws Exception {
         ignite(0).createCache(ccfg);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/35fd10d3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
index cdf4ffd..b1316ab 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
@@ -31,12 +31,14 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import javax.cache.Cache;
 import javax.cache.configuration.Factory;
 import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryEventFilter;
 import javax.cache.event.CacheEntryListenerException;
 import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.event.EventType;
 import javax.cache.integration.CacheLoaderException;
 import javax.cache.integration.CacheWriterException;
 import javax.cache.processor.EntryProcessor;
@@ -56,7 +58,10 @@ import org.apache.ignite.cache.store.CacheStore;
 import org.apache.ignite.cache.store.CacheStoreAdapter;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -68,6 +73,8 @@ import org.apache.ignite.transactions.TransactionIsolation;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static javax.cache.event.EventType.CREATED;
+import static javax.cache.event.EventType.REMOVED;
 import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
@@ -121,11 +128,11 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
     @Override protected void beforeTestsStarted() throws Exception {
         super.beforeTestsStarted();
 
-        startGridsMultiThreaded(NODES - 1);
+        startGridsMultiThreaded(getServerNodeCount());
 
         client = true;
 
-        startGrid(NODES - 1);
+        startGrid(getServerNodeCount());
     }
 
     /** {@inheritDoc} */
@@ -190,7 +197,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, CLIENT);
+        doTestContinuousQuery(ccfg, CLIENT);
     }
 
     /**
@@ -203,7 +210,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, SERVER);
+        doTestContinuousQuery(ccfg, SERVER);
     }
 
     /**
@@ -216,7 +223,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, ALL);
+        doTestContinuousQuery(ccfg, ALL);
     }
 
     /**
@@ -229,7 +236,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, SERVER);
+        doTestContinuousQuery(ccfg, SERVER);
     }
 
     /**
@@ -242,7 +249,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, ALL);
+        doTestContinuousQuery(ccfg, ALL);
     }
 
     /**
@@ -255,7 +262,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, CLIENT);
+        doTestContinuousQuery(ccfg, CLIENT);
     }
 
     /**
@@ -268,7 +275,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             OFFHEAP_VALUES,
             false);
 
-        testContinuousQuery(ccfg, SERVER);
+        doTestContinuousQuery(ccfg, SERVER);
     }
 
     /**
@@ -281,7 +288,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             OFFHEAP_VALUES,
             false);
 
-        testContinuousQuery(ccfg, ALL);
+        doTestContinuousQuery(ccfg, ALL);
     }
 
     /**
@@ -294,7 +301,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             OFFHEAP_VALUES,
             false);
 
-        testContinuousQuery(ccfg, CLIENT);
+        doTestContinuousQuery(ccfg, CLIENT);
     }
 
     /**
@@ -307,7 +314,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             OFFHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, SERVER);
+        doTestContinuousQuery(ccfg, SERVER);
     }
 
     /**
@@ -320,7 +327,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             OFFHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, ALL);
+        doTestContinuousQuery(ccfg, ALL);
     }
 
     /**
@@ -333,7 +340,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             OFFHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, CLIENT);
+        doTestContinuousQuery(ccfg, CLIENT);
     }
 
     /**
@@ -346,7 +353,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, SERVER);
+        doTestContinuousQuery(ccfg, SERVER);
     }
 
     /**
@@ -359,7 +366,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, ALL);
+        doTestContinuousQuery(ccfg, ALL);
     }
 
     /**
@@ -372,7 +379,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, CLIENT);
+        doTestContinuousQuery(ccfg, CLIENT);
     }
 
     /**
@@ -385,7 +392,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, SERVER);
+        doTestContinuousQuery(ccfg, SERVER);
     }
 
     /**
@@ -398,7 +405,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, ALL);
+        doTestContinuousQuery(ccfg, ALL);
     }
 
     /**
@@ -411,7 +418,411 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, SERVER);
+        doTestContinuousQuery(ccfg, SERVER);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDoubleRemoveAtomicWithoutBackup() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            0,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        doTestNotModifyOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDoubleRemoveAtomicWithoutBackupWithStore() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            0,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        doTestNotModifyOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDoubleRemoveAtomic() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        doTestNotModifyOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDoubleRemoveAtomicWithStore() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            ONHEAP_TIERED,
+            true);
+
+        doTestNotModifyOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDoubleRemoveAtomicOffheap() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            0,
+            ATOMIC,
+            OFFHEAP_TIERED,
+            false);
+
+        doTestNotModifyOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDoubleRemoveAtomicOffheapWithStore() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            0,
+            ATOMIC,
+            OFFHEAP_TIERED,
+            true);
+
+        doTestNotModifyOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDoubleRemoveTx() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        doTestNotModifyOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDoubleRemoveTxWithStore() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        doTestNotModifyOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDoubleRemoveReplicatedTx() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            0,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        doTestNotModifyOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDoubleRemoveReplicatedTxWithStore() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            0,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        doTestNotModifyOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDoubleRemoveReplicatedAtomic() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            0,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        doTestNotModifyOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDoubleRemoveReplicatedAtomicWithStore() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            0,
+            ATOMIC,
+            ONHEAP_TIERED,
+            true);
+
+        doTestNotModifyOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void doTestNotModifyOperation(CacheConfiguration ccfg) throws Exception {
+        singleOperation(ccfg);
+        batchOperation(ccfg);
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @throws Exception If failed.
+     */
+    private void singleOperation(CacheConfiguration ccfg) throws Exception {
+        IgniteCache<QueryTestKey, QueryTestValue> cache = grid(getClientIndex()).createCache(ccfg);
+
+        try {
+            ContinuousQuery<QueryTestKey, QueryTestValue> qry = new ContinuousQuery<>();
+
+            final List<CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>
evts =
+                new CopyOnWriteArrayList<>();
+
+            qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>()
{
+                @Override public void onUpdated(Iterable<CacheEntryEvent<? extends
QueryTestKey,
+                    ? extends QueryTestValue>> events) throws CacheEntryListenerException
{
+                    for (CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>
e : events)
+                        evts.add(e);
+                }
+            });
+
+            QueryTestKey key = new QueryTestKey(1);
+
+            try (QueryCursor qryCur = cache.query(qry)) {
+                for (int i = 0; i < ITERATION_CNT; i++) {
+                    log.info("Start iteration: " + i);
+                    // Not events.
+                    cache.invoke(key, (EntryProcessor<QueryTestKey, QueryTestValue, ?
extends Object>)
+                        (Object)new EntrySetValueProcessor(true));
+
+                    // Get events.
+                    cache.put(key, new QueryTestValue(1));
+                    cache.remove(key);
+
+                    // Not events.
+                    cache.invoke(key, (EntryProcessor<QueryTestKey, QueryTestValue, ?
extends Object>)
+                        (Object)new EntrySetValueProcessor(null, false));
+                    cache.invoke(key, (EntryProcessor<QueryTestKey, QueryTestValue, ?
extends Object>)
+                        (Object)new EntrySetValueProcessor(null, false));
+                    cache.invoke(key, (EntryProcessor<QueryTestKey, QueryTestValue, ?
extends Object>)
+                        (Object)new EntrySetValueProcessor(true));
+                    cache.remove(key);
+
+                    // Get events.
+                    cache.put(key, new QueryTestValue(2));
+
+                    // Not events.
+                    cache.invoke(key, (EntryProcessor<QueryTestKey, QueryTestValue, ?
extends Object>)
+                        (Object)new EntrySetValueProcessor(true));
+
+                    // Get events.
+                    cache.invoke(key, (EntryProcessor<QueryTestKey, QueryTestValue, ?
extends Object>)
+                        (Object)new EntrySetValueProcessor(null, false));
+
+                    // Not events.
+                    cache.remove(key);
+
+                    // Get events.
+                    cache.put(key, new QueryTestValue(3));
+                    cache.put(key, new QueryTestValue(4));
+
+                    // Not events.
+                    cache.invoke(key, (EntryProcessor<QueryTestKey, QueryTestValue, ?
extends Object>)
+                        (Object)new EntrySetValueProcessor(true));
+                    cache.putIfAbsent(key, new QueryTestValue(5));
+                    cache.putIfAbsent(key, new QueryTestValue(5));
+                    cache.putIfAbsent(key, new QueryTestValue(5));
+                    cache.invoke(key, (EntryProcessor<QueryTestKey, QueryTestValue, ?
extends Object>)
+                        (Object)new EntrySetValueProcessor(true));
+                    cache.remove(key, new QueryTestValue(5));
+
+                    // Get events.
+                    cache.remove(key, new QueryTestValue(4));
+                    cache.putIfAbsent(key, new QueryTestValue(5));
+
+                    // Not events.
+                    cache.replace(key, new QueryTestValue(3), new QueryTestValue(2));
+                    cache.replace(key, new QueryTestValue(3), new QueryTestValue(2));
+                    cache.replace(key, new QueryTestValue(3), new QueryTestValue(2));
+
+                    // Get events.
+                    cache.replace(key, new QueryTestValue(5), new QueryTestValue(6));
+
+                    assert GridTestUtils.waitForCondition(new PA() {
+                        @Override public boolean apply() {
+                            return evts.size() == 9;
+                        }
+                    }, 5_000);
+
+                    checkSingleEvent(evts.get(0), CREATED, new QueryTestValue(1), null);
+                    checkSingleEvent(evts.get(1), REMOVED, null, new QueryTestValue(1));
+                    checkSingleEvent(evts.get(2), CREATED, new QueryTestValue(2), null);
+                    checkSingleEvent(evts.get(3), REMOVED, null, new QueryTestValue(2));
+                    checkSingleEvent(evts.get(4), CREATED, new QueryTestValue(3), null);
+                    checkSingleEvent(evts.get(5), EventType.UPDATED, new QueryTestValue(4),
new QueryTestValue(3));
+                    checkSingleEvent(evts.get(6), REMOVED, null, new QueryTestValue(4));
+                    checkSingleEvent(evts.get(7), CREATED, new QueryTestValue(5), null);
+                    checkSingleEvent(evts.get(8), EventType.UPDATED, new QueryTestValue(6),
new QueryTestValue(5));
+
+                    cache.remove(key);
+                    cache.remove(key);
+
+                    evts.clear();
+
+                    log.info("Finish iteration: " + i);
+                }
+            }
+        }
+        finally {
+            grid(getClientIndex()).destroyCache(ccfg.getName());
+        }
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @throws Exception If failed.
+     */
+    private void batchOperation(CacheConfiguration ccfg) throws Exception {
+        IgniteCache<QueryTestKey, QueryTestValue> cache = grid(getClientIndex()).createCache(ccfg);
+
+        try {
+            ContinuousQuery<QueryTestKey, QueryTestValue> qry = new ContinuousQuery<>();
+
+            final List<CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>
evts =
+                new CopyOnWriteArrayList<>();
+
+            qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>()
{
+                @Override public void onUpdated(Iterable<CacheEntryEvent<? extends
QueryTestKey,
+                    ? extends QueryTestValue>> events) throws CacheEntryListenerException
{
+                    for (CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>
e : events)
+                        evts.add(e);
+                }
+            });
+
+            Map<QueryTestKey, QueryTestValue> map = new TreeMap<>();
+
+            for (int i = 0; i < KEYS; i++)
+                map.put(new QueryTestKey(i), new QueryTestValue(i));
+
+            try (QueryCursor qryCur = cache.query(qry)) {
+                for (int i = 0; i < ITERATION_CNT / 2; i++) {
+                    log.info("Start iteration: " + i);
+                    // Not events.
+                    cache.removeAll(map.keySet());
+                    cache.invokeAll(map.keySet(), (EntryProcessor<QueryTestKey, QueryTestValue,
? extends Object>)
+                        (Object)new EntrySetValueProcessor(null, false));
+                    cache.invokeAll(map.keySet(), (EntryProcessor<QueryTestKey, QueryTestValue,
? extends Object>)
+                        (Object)new EntrySetValueProcessor(true));
+
+                    // Get events.
+                    cache.putAll(map);
+
+                    assert GridTestUtils.waitForCondition(new PA() {
+                        @Override public boolean apply() {
+                            return evts.size() == KEYS;
+                        }
+                    }, 5_000);
+
+                    checkEvents(evts, CREATED);
+
+                    evts.clear();
+
+                    // Not events.
+                    cache.invokeAll(map.keySet(), (EntryProcessor<QueryTestKey, QueryTestValue,
? extends Object>)
+                        (Object)new EntrySetValueProcessor(true));
+
+                    U.sleep(100);
+
+                    assertEquals(0, evts.size());
+
+                    // Get events.
+                    cache.invokeAll(map.keySet(), (EntryProcessor<QueryTestKey, QueryTestValue,
? extends Object>)
+                        (Object)new EntrySetValueProcessor(null, false));
+
+                    // Not events.
+                    cache.removeAll(map.keySet());
+                    cache.removeAll(map.keySet());
+
+                    assert GridTestUtils.waitForCondition(new PA() {
+                        @Override public boolean apply() {
+                            return evts.size() == KEYS;
+                        }
+                    }, 5_000);
+
+                    checkEvents(evts, REMOVED);
+
+                    evts.clear();
+
+                    log.info("Finish iteration: " + i);
+                }
+            }
+        }
+        finally {
+            grid(getClientIndex()).destroyCache(ccfg.getName());
+        }
+    }
+
+    /**
+     * @param evts Events.
+     * @param evtType Event type.
+     */
+    private void checkEvents(List<CacheEntryEvent<? extends QueryTestKey, ? extends
QueryTestValue>> evts,
+        EventType evtType) {
+        for (int key = 0; key < KEYS; key++) {
+            QueryTestKey keyVal = new QueryTestKey(key);
+
+            for (CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>
e : evts) {
+                if (e.getKey().equals(keyVal)) {
+                    checkSingleEvent(e,
+                        evtType,
+                        evtType == CREATED ? new QueryTestValue(key) : null,
+                        evtType == REMOVED ? new QueryTestValue(key) : null);
+
+                    keyVal = null;
+
+                    break;
+                }
+            }
+
+            assertNull("Event for key not found.", keyVal);
+        }
+    }
+
+
+    /**
+     * @param event Event.
+     * @param type Event type.
+     * @param val Value.
+     * @param oldVal Old value.
+     */
+    private void checkSingleEvent(
+        CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> event,
+        EventType type,
+        QueryTestValue val,
+        QueryTestValue oldVal) {
+        assertEquals(event.getEventType(), type);
+        assertEquals(event.getValue(), val);
+        assertEquals(event.getOldValue(), oldVal);
     }
 
     /**
@@ -424,7 +835,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, CLIENT);
+        doTestContinuousQuery(ccfg, CLIENT);
     }
 
     /**
@@ -437,7 +848,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, CLIENT);
+        doTestContinuousQuery(ccfg, CLIENT);
     }
 
     /**
@@ -450,7 +861,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, SERVER);
+        doTestContinuousQuery(ccfg, SERVER);
     }
 
     /**
@@ -463,7 +874,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, CLIENT);
+        doTestContinuousQuery(ccfg, CLIENT);
     }
 
     /**
@@ -476,7 +887,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             OFFHEAP_VALUES,
             false);
 
-        testContinuousQuery(ccfg, SERVER);
+        doTestContinuousQuery(ccfg, SERVER);
     }
 
     /**
@@ -489,7 +900,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             OFFHEAP_VALUES,
             false);
 
-        testContinuousQuery(ccfg, ALL);
+        doTestContinuousQuery(ccfg, ALL);
     }
 
     /**
@@ -502,7 +913,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             OFFHEAP_VALUES,
             false);
 
-        testContinuousQuery(ccfg, SERVER);
+        doTestContinuousQuery(ccfg, SERVER);
     }
 
     /**
@@ -515,7 +926,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             OFFHEAP_VALUES,
             false);
 
-        testContinuousQuery(ccfg, CLIENT);
+        doTestContinuousQuery(ccfg, CLIENT);
     }
 
     /**
@@ -528,7 +939,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             OFFHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, SERVER);
+        doTestContinuousQuery(ccfg, SERVER);
     }
 
     /**
@@ -541,7 +952,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             OFFHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, ALL);
+        doTestContinuousQuery(ccfg, ALL);
     }
 
     /**
@@ -554,7 +965,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             OFFHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, CLIENT);
+        doTestContinuousQuery(ccfg, CLIENT);
     }
 
     /**
@@ -567,7 +978,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             OFFHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, CLIENT);
+        doTestContinuousQuery(ccfg, CLIENT);
     }
 
     /**
@@ -580,7 +991,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, SERVER);
+        doTestContinuousQuery(ccfg, SERVER);
     }
 
     /**
@@ -593,7 +1004,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, ALL);
+        doTestContinuousQuery(ccfg, ALL);
     }
 
     /**
@@ -606,7 +1017,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, SERVER);
+        doTestContinuousQuery(ccfg, SERVER);
     }
 
     /**
@@ -619,7 +1030,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, CLIENT);
+        doTestContinuousQuery(ccfg, CLIENT);
     }
 
     /**
@@ -627,7 +1038,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
      * @param deploy The place where continuous query will be started.
      * @throws Exception If failed.
      */
-    protected void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, ContinuousDeploy
deploy)
+    protected void doTestContinuousQuery(CacheConfiguration<Object, Object> ccfg, ContinuousDeploy
deploy)
         throws Exception {
         ignite(0).createCache(ccfg);
 
@@ -656,7 +1067,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
 
                 evtsQueues.add(evtsQueue);
 
-                QueryCursor<?> cur = grid(NODES - 1).cache(ccfg.getName()).query(qry);
+                QueryCursor<?> cur = grid(getClientIndex()).cache(ccfg.getName()).query(qry);
 
                 curs.add(cur);
             }
@@ -674,12 +1085,12 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
 
                 evtsQueues.add(evtsQueue);
 
-                QueryCursor<?> cur = grid(rnd.nextInt(NODES - 1)).cache(ccfg.getName()).query(qry);
+                QueryCursor<?> cur = grid(rnd.nextInt(getServerNodeCount())).cache(ccfg.getName()).query(qry);
 
                 curs.add(cur);
             }
             else {
-                for (int i = 0; i < NODES - 1; i++) {
+                for (int i = 0; i <= getServerNodeCount(); i++) {
                     ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
 
                     final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new
ArrayBlockingQueue<>(50_000);
@@ -708,7 +1119,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
                     if (i % 20 == 0)
                         log.info("Iteration: " + i);
 
-                    for (int idx = 0; idx < NODES; idx++)
+                    for (int idx = 0; idx < getServerNodeCount(); idx++)
                         randomUpdate(rnd, evtsQueues, expData, partCntr, grid(idx).cache(ccfg.getName()));
                 }
             }
@@ -723,6 +1134,20 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
     }
 
     /**
+     * @return Client node index.
+     */
+    private int getClientIndex() {
+        return getServerNodeCount() - 1;
+    }
+
+    /**
+     * @return Count nodes.
+     */
+    protected int getServerNodeCount() {
+        return NODES;
+    }
+
+    /**
      * @param rnd Random generator.
      * @param evtsQueues Events queue.
      * @param expData Expected cache data.
@@ -1272,9 +1697,11 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
      */
     public static class QueryTestValue implements Serializable {
         /** */
+        @GridToStringInclude
         protected final Integer val1;
 
         /** */
+        @GridToStringInclude
         protected final String val2;
 
         /**
@@ -1323,6 +1750,16 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
         /** */
         private boolean retOld;
 
+        /** */
+        private boolean skipModify;
+
+        /**
+         * @param skipModify If {@code true} then entry will not be modified.
+         */
+        public EntrySetValueProcessor(boolean skipModify) {
+            this.skipModify = skipModify;
+        }
+
         /**
          * @param val Value to set.
          * @param retOld Return old value flag.
@@ -1334,6 +1771,9 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
 
         /** {@inheritDoc} */
         @Override public Object process(MutableEntry<Object, Object> e, Object... args)
{
+            if (skipModify)
+                return null;
+
             Object old = retOld ? e.getValue() : null;
 
             if (val != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/35fd10d3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTwoNodesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTwoNodesTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTwoNodesTest.java
new file mode 100644
index 0000000..ebb7861
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTwoNodesTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+/**
+ *
+ */
+public class CacheContinuousQueryRandomOperationsTwoNodesTest extends CacheContinuousQueryRandomOperationsTest
{
+    /** {@inheritDoc} */
+    @Override protected int getServerNodeCount() {
+        return 1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/35fd10d3/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
index 3487421..b2cb50b 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinu
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryOperationP2PTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTwoNodesTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicNearEnabledSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicOffheapTieredTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicOffheapValuesTest;
@@ -106,6 +107,7 @@ public class IgniteCacheQuerySelfTestSuite3 extends TestSuite {
         suite.addTestSuite(CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.class);
         suite.addTestSuite(CacheContinuousQueryFailoverTxOffheapTieredTest.class);
         suite.addTestSuite(CacheContinuousQueryRandomOperationsTest.class);
+        suite.addTestSuite(CacheContinuousQueryRandomOperationsTwoNodesTest.class);
         suite.addTestSuite(CacheContinuousQueryFactoryFilterTest.class);
         suite.addTestSuite(GridCacheContinuousQueryConcurrentTest.class);
         suite.addTestSuite(CacheContinuousQueryOperationP2PTest.class);


Mime
View raw message