ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject ignite git commit: IGNITE-2515 Minor fixes.
Date Thu, 11 Feb 2016 15:34:53 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-2515 301abcfa6 -> 51cd38378


IGNITE-2515 Minor fixes.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/51cd3837
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/51cd3837
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/51cd3837

Branch: refs/heads/ignite-2515
Commit: 51cd38378a424ab69408442d4b5e91b187905721
Parents: 301abcf
Author: nikolay_tikhonov <ntikhonov@gridgain.com>
Authored: Thu Feb 11 18:35:03 2016 +0300
Committer: nikolay_tikhonov <ntikhonov@gridgain.com>
Committed: Thu Feb 11 18:35:03 2016 +0300

----------------------------------------------------------------------
 .../continuous/CacheContinuousQueryManager.java |  21 +-
 ...acheContinuousQueryRandomOperationsTest.java | 228 +++++++++++++++----
 2 files changed, 193 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/51cd3837/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index afc3f24..d2150f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -167,25 +167,10 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
         int partId,
         long updCntr,
         AffinityTopologyVersion topVer) {
-        assert lsnrs != null;
+        boolean recordIgniteEvt = !(key.internal() || !cctx.userCache())
+            && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
 
-        for (CacheContinuousQueryListener lsnr : lsnrs.values()) {
-            CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry(
-                cctx.cacheId(),
-                UPDATED,
-                key,
-                null,
-                null,
-                lsnr.keepBinary(),
-                partId,
-                updCntr,
-                topVer);
-
-            CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
-                cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
-
-            lsnr.skipUpdateEvent(evt, topVer, true, false);
-        }
+        skipUpdateEvent(lsnrs, key, partId, updCntr, true, recordIgniteEvt, topVer);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/51cd3837/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
index 4eb133e..3cd58d6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
@@ -18,12 +18,15 @@
 package org.apache.ignite.internal.processors.cache.query.continuous;
 
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
 import javax.cache.Cache;
 import javax.cache.configuration.Factory;
 import javax.cache.event.CacheEntryEvent;
@@ -143,6 +146,19 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
     /**
      * @throws Exception If failed.
      */
+    public void testAtomicAllNodes() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, null, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testAtomicReplicated() throws Exception {
         CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
             0,
@@ -156,6 +172,19 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
     /**
      * @throws Exception If failed.
      */
+    public void testAtomicReplicatedAllNodes() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            0,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testAtomicReplicatedClient() throws Exception {
         CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
             0,
@@ -182,6 +211,19 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
     /**
      * @throws Exception If failed.
      */
+    public void testAtomicOffheapValuesAllNodes() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            OFFHEAP_VALUES,
+            false);
+
+        testContinuousQuery(ccfg, null, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testAtomicOffheapValuesClient() throws Exception {
         CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
             1,
@@ -208,6 +250,19 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
     /**
      * @throws Exception If failed.
      */
+    public void testAtomicOffheapTieredAllNodes() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            OFFHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, null, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testAtomicOffheapTieredClient() throws Exception {
         CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
             1,
@@ -234,6 +289,19 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
     /**
      * @throws Exception If failed.
      */
+    public void testAtomicNoBackupsAllNodes() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            0,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, null, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testAtomicNoBackupsClient() throws Exception {
         CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
             0,
@@ -260,6 +328,19 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
     /**
      * @throws Exception If failed.
      */
+    public void testTxAllNodes() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, null, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testTxExplicit() throws Exception {
         CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
             1,
@@ -338,6 +419,19 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
     /**
      * @throws Exception If failed.
      */
+    public void testTxOffheapValuesAllNodes() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            OFFHEAP_VALUES,
+            false);
+
+        testContinuousQuery(ccfg, null, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testTxOffheapValuesExplicit() throws Exception {
         CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
             1,
@@ -377,6 +471,19 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
     /**
      * @throws Exception If failed.
      */
+    public void testTxOffheapTieredAllNodes() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            OFFHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, null, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testTxOffheapTieredClient() throws Exception {
         CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
             1,
@@ -416,6 +523,19 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
     /**
      * @throws Exception If failed.
      */
+    public void testTxNoBackupsAllNodes() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            0,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, null, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testTxNoBackupsExplicit() throws Exception {
         CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
             0,
@@ -441,21 +561,23 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
 
     /**
      * @param ccfg Cache configuration.
-     * @param client Client.
+     * @param client Client. If {@code null} then listener will be registered on all nodes.
      * @param expTx Explicit tx.
      * @throws Exception If failed.
      */
-    private void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, boolean
client, boolean expTx)
+    private void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, Boolean
client, boolean expTx)
         throws Exception {
         ignite(0).createCache(ccfg);
 
         try {
-            IgniteCache<Object, Object> cache;
+            IgniteCache<Object, Object> cache = null;
 
-            if (client)
-                cache = ignite(NODES - 1).cache(ccfg.getName());
-            else
-                cache = ignite(0).cache(ccfg.getName());
+            if (client != null) {
+                if (client)
+                    cache = ignite(NODES - 1).cache(ccfg.getName());
+                else
+                    cache = ignite(0).cache(ccfg.getName());
+            }
 
             long seed = System.currentTimeMillis();
 
@@ -463,19 +585,43 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
 
             log.info("Random seed: " + seed);
 
-            ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
-
             final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue =
                 new ArrayBlockingQueue<>(50_000);
 
-            qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
-                @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>>
evts) {
-                    for (CacheEntryEvent<?, ?> evt : evts)
-                        evtsQueue.add(evt);
+            Collection<QueryCursor<?>> curs = new ArrayList<>();
+
+            if (cache != null) {
+                ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+                qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>()
{
+                    @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>>
evts) {
+                        for (CacheEntryEvent<?, ?> evt : evts)
+                            evtsQueue.add(evt);
+                    }
+                });
+
+                QueryCursor<?> cur = cache.query(qry);
+
+                curs.add(cur);
+            }
+            else {
+                for (int i = 0; i < NODES - 1; i++) {
+                    ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+                    qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>()
{
+                        @Override public void onUpdated(Iterable<CacheEntryEvent<?,
?>> evts) {
+                            for (CacheEntryEvent<?, ?> evt : evts)
+                                evtsQueue.add(evt);
+                        }
+                    });
+
+                    QueryCursor<?> cur = ignite(i).cache(ccfg.getName()).query(qry);
+
+                    curs.add(cur);
                 }
-            });
 
-            QueryCursor<?> cur = cache.query(qry);
+                cache = ignite(ThreadLocalRandom.current().nextInt(NODES - 1)).cache(ccfg.getName());
+            }
 
             ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>();
 
@@ -486,11 +632,12 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
                     if (i % 100 == 0)
                         log.info("Iteration: " + i);
 
-                    randomUpdate(rnd, evtsQueue, expData, partCntr, cache, expTx);
+                    randomUpdate(rnd, evtsQueue, expData, partCntr, cache, expTx, curs.size());
                 }
             }
             finally {
-                cur.close();
+                for (QueryCursor<?> cur : curs)
+                    cur.close();
             }
         }
         finally {
@@ -505,6 +652,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
      * @param partCntr Partition counter.
      * @param cache Cache.
      * @param expTx Explicit TX.
+     * @param qryCnt Query count.
      * @throws Exception If failed.
      */
     private void randomUpdate(
@@ -513,7 +661,8 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
         ConcurrentMap<Object, Object> expData,
         Map<Integer, Long> partCntr,
         IgniteCache<Object, Object> cache,
-        boolean expTx)
+        boolean expTx,
+        int qryCnt)
         throws Exception {
         Object key = new QueryTestKey(rnd.nextInt(KEYS));
         Object newVal = value(rnd);
@@ -540,7 +689,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
 
                     updatePartitionCounter(cache, key, partCntr);
 
-                    waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, newVal,
oldVal);
+                    waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, newVal,
oldVal, qryCnt);
 
                     expData.put(key, newVal);
 
@@ -555,7 +704,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
 
                     updatePartitionCounter(cache, key, partCntr);
 
-                    waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, newVal,
oldVal);
+                    waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, newVal,
oldVal, qryCnt);
 
                     expData.put(key, newVal);
 
@@ -570,7 +719,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
 
                     updatePartitionCounter(cache, key, partCntr);
 
-                    waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, null, oldVal);
+                    waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, null, oldVal,
qryCnt);
 
                     expData.remove(key);
 
@@ -585,7 +734,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
 
                     updatePartitionCounter(cache, key, partCntr);
 
-                    waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, null, oldVal);
+                    waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, null, oldVal,
qryCnt);
 
                     expData.remove(key);
 
@@ -600,7 +749,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
 
                     updatePartitionCounter(cache, key, partCntr);
 
-                    waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, newVal,
oldVal);
+                    waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, newVal,
oldVal, qryCnt);
 
                     expData.put(key, newVal);
 
@@ -615,7 +764,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
 
                     updatePartitionCounter(cache, key, partCntr);
 
-                    waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, null, oldVal);
+                    waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, null, oldVal,
qryCnt);
 
                     expData.remove(key);
 
@@ -631,7 +780,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
                     if (oldVal == null) {
                         updatePartitionCounter(cache, key, partCntr);
 
-                        waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, newVal,
null);
+                        waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, newVal,
null, qryCnt);
 
                         expData.put(key, newVal);
                     }
@@ -650,7 +799,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
                     if (oldVal == null) {
                         updatePartitionCounter(cache, key, partCntr);
 
-                        waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, newVal,
null);
+                        waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, newVal,
null, qryCnt);
 
                         expData.put(key, newVal);
                     }
@@ -669,7 +818,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
                     if (oldVal != null) {
                         updatePartitionCounter(cache, key, partCntr);
 
-                        waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, newVal,
oldVal);
+                        waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, newVal,
oldVal, qryCnt);
 
                         expData.put(key, newVal);
                     }
@@ -688,7 +837,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
                     if (oldVal != null) {
                         updatePartitionCounter(cache, key, partCntr);
 
-                        waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, newVal,
oldVal);
+                        waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key, newVal,
oldVal, qryCnt);
 
                         expData.put(key, newVal);
                     }
@@ -712,7 +861,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
 
                             updatePartitionCounter(cache, key, partCntr);
 
-                            waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key,
newVal, oldVal);
+                            waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), key,
newVal, oldVal, qryCnt);
 
                             expData.put(key, newVal);
                         }
@@ -779,6 +928,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
      * @param key Key.
      * @param val Value.
      * @param oldVal Old value.
+     * @param qryCnt Query count.
      * @throws Exception If failed.
      */
     private void waitAndCheckEvent(BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue,
@@ -786,24 +936,26 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
         Affinity<Object> aff,
         Object key,
         Object val,
-        Object oldVal) throws Exception {
+        Object oldVal, int qryCnt) throws Exception {
         if (val == null && oldVal == null) {
             checkNoEvent(evtsQueue);
 
             return;
         }
 
-        CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS);
+        for (int i = 0; i < qryCnt; i++) {
+            CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS);
 
-        assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal="
+ oldVal + ']', evt);
-        assertEquals(key, evt.getKey());
-        assertEquals(val, evt.getValue());
-        assertEquals(oldVal, evt.getOldValue());
+            assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal="
+ oldVal + ']', evt);
+            assertEquals(key, evt.getKey());
+            assertEquals(val, evt.getValue());
+            assertEquals(oldVal, evt.getOldValue());
 
-        Long cntr = partCntrs.get(aff.partition(key));
+            Long cntr = partCntrs.get(aff.partition(key));
 
-        assertNotNull(cntr);
-        assertEquals(cntr, evt.unwrap(Long.class));
+            assertNotNull(cntr);
+            assertEquals(cntr, evt.unwrap(Long.class));
+        }
     }
 
     /**


Mime
View raw message