ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [28/53] [abbrv] ignite git commit: IGNITE-3272 Fixed "Memory consumption in ContinuousQueryHandler". This close #930.
Date Fri, 19 Aug 2016 11:48:45 GMT
IGNITE-3272 Fixed "Memory consumption in ContinuousQueryHandler". This close #930.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ff3e00ca
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ff3e00ca
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ff3e00ca

Branch: refs/heads/ignite-3299
Commit: ff3e00caa892a7399622711b620fcb4dcfbbfb56
Parents: 151dfa7
Author: nikolay_tikhonov <ntikhonov@gridgain.com>
Authored: Wed Aug 10 16:21:52 2016 +0300
Committer: nikolay_tikhonov <ntikhonov@gridgain.com>
Committed: Wed Aug 10 16:21:52 2016 +0300

----------------------------------------------------------------------
 .../internal/GridEventConsumeHandler.java       |   5 +
 .../internal/GridMessageListenHandler.java      |   5 +
 .../continuous/CacheContinuousQueryEntry.java   |  16 +-
 .../continuous/CacheContinuousQueryHandler.java |  85 ++++++---
 .../continuous/GridContinuousHandler.java       |   5 +
 .../continuous/GridContinuousProcessor.java     |  11 +-
 .../continuous/GridContinuousQueryBatch.java    |  47 +++++
 ...niteCacheContinuousQueryBackupQueueTest.java | 184 ++++++++++++++++++-
 8 files changed, 318 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ff3e00ca/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index 19bf1a7..b4b1e58 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -402,6 +402,11 @@ class GridEventConsumeHandler implements GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
+    @Override public void onNodeLeft() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public Object orderedTopic() {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ff3e00ca/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index 0ac6877..2b8041d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -224,6 +224,11 @@ public class GridMessageListenHandler implements GridContinuousHandler
{
     }
 
     /** {@inheritDoc} */
+    @Override public void onNodeLeft() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         out.writeBoolean(depEnabled);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ff3e00ca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
index 74f930a..366a1e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -200,6 +200,20 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable,
Message {
     }
 
     /**
+     * @param topVer Topology version.
+     */
+    void topologyVersion(AffinityTopologyVersion topVer) {
+        this.topVer = topVer;
+    }
+
+    /**
+     * @return Size include this event and filtered.
+     */
+    public int size() {
+        return filteredEvts != null ? filteredEvts.size() + 1 : 1;
+    }
+
+    /**
      * @return If entry filtered then will return light-weight <i><b>new entry</b></i>
without values and key
      * (avoid to huge memory consumption), otherwise {@code this}.
      */
@@ -208,7 +222,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable,
Message {
             return this;
 
         CacheContinuousQueryEntry e =
-            new CacheContinuousQueryEntry(cacheId, evtType, null, null, null, keepBinary,
part, updateCntr, topVer);
+            new CacheContinuousQueryEntry(cacheId, null, null, null, null, keepBinary, part,
updateCntr, null);
 
         e.flags = flags;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ff3e00ca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 5012569..7b3b47b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -64,8 +64,8 @@ import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryLocalListener;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryRemoteFilter;
 import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
-import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
+import org.apache.ignite.internal.processors.continuous.GridContinuousQueryBatch;
 import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter;
 import org.apache.ignite.internal.util.GridConcurrentSkipListSet;
 import org.apache.ignite.internal.util.GridLongList;
@@ -132,7 +132,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     private transient boolean skipPrimaryCheck;
 
     /** Backup queue. */
-    private transient Collection<CacheContinuousQueryEntry> backupQueue;
+    private transient volatile Collection<CacheContinuousQueryEntry> backupQueue;
 
     /** */
     private boolean locCache;
@@ -430,33 +430,48 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
             }
 
             @Override public void cleanupBackupQueue(Map<Integer, Long> updateCntrs)
{
-                Iterator<CacheContinuousQueryEntry> it = backupQueue.iterator();
+                Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue;
 
-                while (it.hasNext()) {
-                    CacheContinuousQueryEntry backupEntry = it.next();
+                if (backupQueue0 != null) {
+                    Iterator<CacheContinuousQueryEntry> it = backupQueue0.iterator();
 
-                    Long updateCntr = updateCntrs.get(backupEntry.partition());
+                    while (it.hasNext()) {
+                        CacheContinuousQueryEntry backupEntry = it.next();
 
-                    if (updateCntr != null && backupEntry.updateCounter() <= updateCntr)
-                        it.remove();
+                        Long updateCntr = updateCntrs.get(backupEntry.partition());
+
+                        if (updateCntr != null && backupEntry.updateCounter() <=
updateCntr)
+                            it.remove();
+                    }
                 }
             }
 
             @Override public void flushBackupQueue(GridKernalContext ctx, AffinityTopologyVersion
topVer) {
-                if (backupQueue.isEmpty())
+                Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue;
+
+                if (backupQueue0 == null)
                     return;
 
                 try {
-                    GridCacheContext<K, V> cctx = cacheContext(ctx);
+                    ClusterNode nodeId0 = ctx.discovery().node(nodeId);
 
-                    for (CacheContinuousQueryEntry e : backupQueue) {
-                        if (!e.isFiltered())
-                            prepareEntry(cctx, nodeId, e);
-                    }
+                    if (nodeId0 != null) {
+                        GridCacheContext<K, V> cctx = cacheContext(ctx);
 
-                    ctx.continuous().addBackupNotification(nodeId, routineId, backupQueue,
topic);
+                        for (CacheContinuousQueryEntry e : backupQueue0) {
+                            if (!e.isFiltered())
+                                prepareEntry(cctx, nodeId, e);
+
+                            e.topologyVersion(topVer);
+                        }
+
+                        ctx.continuous().addBackupNotification(nodeId, routineId, backupQueue0,
topic);
+                    }
+                    else
+                        // Node which start CQ leave topology. Not needed to put data to
backup queue.
+                        backupQueue = null;
 
-                    backupQueue.clear();
+                    backupQueue0.clear();
                 }
                 catch (IgniteCheckedException e) {
                     U.error(ctx.log(getClass()), "Failed to send backup event notification
to node: " + nodeId, e);
@@ -479,9 +494,13 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
             }
 
             @Override public void onPartitionEvicted(int part) {
-                for (Iterator<CacheContinuousQueryEntry> it = backupQueue.iterator();
it.hasNext();) {
-                    if (it.next().partition() == part)
-                        it.remove();
+                Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue;
+
+                if (backupQueue0 != null) {
+                    for (Iterator<CacheContinuousQueryEntry> it = backupQueue0.iterator();
it.hasNext(); ) {
+                        if (it.next().partition() == part)
+                            it.remove();
+                    }
                 }
             }
 
@@ -740,7 +759,10 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         if (!primary && !internal && entry.updateCounter() != -1L /* Skip
init query and expire entries */) {
             entry.markBackup();
 
-            backupQueue.add(entry.forBackupQueue());
+            Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue;
+
+            if (backupQueue0 != null)
+                backupQueue0.add(entry.forBackupQueue());
         }
 
         return notify;
@@ -765,12 +787,11 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 if (!locCache) {
                     Collection<CacheEntryEvent<? extends K, ? extends V>> evts
= handleEvent(ctx, entry);
 
-                    if (!evts.isEmpty()) {
+                    if (!evts.isEmpty())
                         locLsnr.onUpdated(evts);
 
-                        if (!internal && !skipPrimaryCheck)
-                            sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId,
ctx);
-                    }
+                    if (!internal && !skipPrimaryCheck)
+                        sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx);
                 }
                 else {
                     if (!entry.isFiltered())
@@ -931,7 +952,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
          * @param topVer Topology version.
          * @param initCntr Update counters.
          */
-        public PartitionRecovery(IgniteLogger log, AffinityTopologyVersion topVer, @Nullable
Long initCntr) {
+        PartitionRecovery(IgniteLogger log, AffinityTopologyVersion topVer, @Nullable Long
initCntr) {
             this.log = log;
 
             if (initCntr != null) {
@@ -1176,6 +1197,14 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     }
 
     /** {@inheritDoc} */
+    @Override public void onNodeLeft() {
+        Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue;
+
+        if (backupQueue0 != null)
+            backupQueue = null;
+    }
+
+    /** {@inheritDoc} */
     @Override public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException
{
         assert ctx != null;
         assert ctx.config().isPeerClassLoadingEnabled();
@@ -1196,7 +1225,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
     /** {@inheritDoc} */
     @Override public GridContinuousBatch createBatch() {
-        return new GridContinuousBatchAdapter();
+        return new GridContinuousQueryBatch();
     }
 
     /** {@inheritDoc} */
@@ -1345,7 +1374,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         @SuppressWarnings("unchecked")
         @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>
         onAcknowledged(GridContinuousBatch batch) {
-            size += batch.size();
+            assert batch instanceof GridContinuousQueryBatch;
+
+            size += ((GridContinuousQueryBatch)batch).entriesCount();
 
             Collection<CacheContinuousQueryEntry> entries = (Collection)batch.collect();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ff3e00ca/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
index 46e87af..c90746d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
@@ -116,6 +116,11 @@ public interface GridContinuousHandler extends Externalizable, Cloneable
{
     public void onBatchAcknowledged(UUID routineId, GridContinuousBatch batch, GridKernalContext
ctx);
 
     /**
+     * Node which started routine leave topology.
+     */
+    public void onNodeLeft();
+
+    /**
      * @return Topic for ordered notifications. If {@code null}, notifications
      * will be sent in non-ordered messages.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/ff3e00ca/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index fce48c4..5f61051 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -175,8 +175,13 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                     UUID routineId = e.getKey();
                     RemoteRoutineInfo info = e.getValue();
 
-                    if (info.autoUnsubscribe && nodeId.equals(info.nodeId))
-                        unregisterRemote(routineId);
+                    if (nodeId.equals(info.nodeId)) {
+                        if (info.autoUnsubscribe)
+                            unregisterRemote(routineId);
+
+                        if (info.hnd.isQuery())
+                            info.hnd.onNodeLeft();
+                    }
                 }
 
                 for (Map.Entry<IgniteUuid, SyncMessageAckFuture> e : syncMsgFuts.entrySet())
{
@@ -865,6 +870,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
                 try {
                     sendNotification(nodeId, routineId, futId, F.asList(obj), null, msg,
null);
+
+                    info.hnd.onBatchAcknowledged(routineId, info.add(obj), ctx);
                 }
                 catch (IgniteCheckedException e) {
                     syncMsgFuts.remove(futId);

http://git-wip-us.apache.org/repos/asf/ignite/blob/ff3e00ca/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousQueryBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousQueryBatch.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousQueryBatch.java
new file mode 100644
index 0000000..c5d854b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousQueryBatch.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEntry;
+
+/**
+ * Continuous routine batch adapter.
+ */
+public class GridContinuousQueryBatch extends GridContinuousBatchAdapter {
+    /** Entries size included filtered entries. */
+    private final AtomicInteger size = new AtomicInteger();
+
+    /** {@inheritDoc} */
+    @Override public void add(Object obj) {
+        assert obj != null;
+        assert obj instanceof CacheContinuousQueryEntry;
+
+        super.add(obj);
+
+        size.addAndGet(((CacheContinuousQueryEntry)obj).size());
+    }
+
+    /**
+     * @return Entries count.
+     */
+    public int entriesCount() {
+        return size.get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ff3e00ca/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
index aea1954..d823409 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
@@ -18,18 +18,29 @@
 package org.apache.ignite.internal.processors.cache.query.continuous;
 
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import javax.cache.configuration.Factory;
 import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryEventFilter;
 import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
@@ -46,14 +57,27 @@ public class IgniteCacheContinuousQueryBackupQueueTest extends GridCommonAbstrac
     /** Keys count. */
     private static final int KEYS_COUNT = 1024;
 
+    /** CQ count. */
+    private static final int QUERY_COUNT = 20;
+
     /** Grid count. */
     private static final int GRID_COUNT = 2;
 
+    /** */
+    private static boolean client = false;
+
+    /** */
+    private static String CACHE_NAME = "test-cache";
+
+    /** */
+    private static final int BACKUP_ACK_THRESHOLD = 100;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
-        CacheConfiguration ccfg = new CacheConfiguration();
+        CacheConfiguration ccfg = new CacheConfiguration(CACHE_NAME);
+
         ccfg.setCacheMode(PARTITIONED);
         ccfg.setAtomicityMode(ATOMIC);
         ccfg.setWriteSynchronizationMode(FULL_SYNC);
@@ -61,42 +85,51 @@ public class IgniteCacheContinuousQueryBackupQueueTest extends GridCommonAbstrac
 
         cfg.setCacheConfiguration(ccfg);
 
+        cfg.setClientMode(client);
+
         ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
 
         return cfg;
     }
 
     /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        startGridsMultiThreaded(GRID_COUNT);
+    }
+
+    /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         super.afterTest();
 
         stopAllGrids();
+
+        client = false;
     }
 
     /** {@inheritDoc} */
     @Override protected long getTestTimeout() {
-        return TimeUnit.MINUTES.toMillis(2);
+        return TimeUnit.MINUTES.toMillis(10);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testBackupQueue() throws Exception {
-        startGridsMultiThreaded(GRID_COUNT);
-
         final CacheEventListener lsnr = new CacheEventListener();
 
         ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
 
         qry.setLocalListener(lsnr);
-        qry.setRemoteFilterFactory(new FilterFactory());
+        qry.setRemoteFilterFactory(new AlwaysFalseFilterFactory());
 
-        try (QueryCursor<?> ignore = grid(0).cache(null).query(qry)) {
+        try (QueryCursor<?> ignore = grid(0).cache(CACHE_NAME).query(qry)) {
             for (int i = 0; i < KEYS_COUNT; i++) {
                 log.info("Put key: " + i);
 
                 for (int j = 0; j < 100; j++)
-                    grid(i % GRID_COUNT).cache(null).put(i, new byte[1024 * 50]);
+                    grid(j % GRID_COUNT).cache(CACHE_NAME).put(i, new byte[1024 * 50]);
             }
 
             log.info("Finish.");
@@ -104,19 +137,150 @@ public class IgniteCacheContinuousQueryBackupQueueTest extends GridCommonAbstrac
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testManyQueryBackupQueue() throws Exception {
+        List<QueryCursor> qryCursors = new ArrayList<>();
+
+        for (int i = 0; i < QUERY_COUNT; i++) {
+            ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+            qry.setLocalListener(new CacheEventListener());
+            qry.setRemoteFilterFactory(new AlwaysFalseFilterFactory());
+
+            qryCursors.add(grid(0).cache(CACHE_NAME).query(qry));
+        }
+
+        for (int i = 0; i < KEYS_COUNT; i++) {
+            log.info("Put key: " + i);
+
+            for (int j = 0; j < 150; j++)
+                grid(ThreadLocalRandom.current().nextInt(GRID_COUNT)).cache(CACHE_NAME).put(i,
new byte[1024 * 50]);
+        }
+
+        int size = backupQueueSize();
+
+        assertTrue(size > 0);
+        assertTrue(size <= BACKUP_ACK_THRESHOLD * QUERY_COUNT * /* partition count */1024);
+
+        for (QueryCursor qry : qryCursors)
+            qry.close();
+    }
+
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBackupQueueAutoUnsubscribeFalse() throws Exception {
+        try {
+            client = true;
+
+            Ignite client = startGrid(GRID_COUNT);
+
+            awaitPartitionMapExchange();
+
+            List<QueryCursor> qryCursors = new ArrayList<>();
+
+            for (int i = 0; i < QUERY_COUNT; i++) {
+                ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+                qry.setLocalListener(new CacheEventListener());
+                qry.setRemoteFilterFactory(new AlwaysFalseFilterFactory());
+                qry.setAutoUnsubscribe(false);
+
+                qryCursors.add(client.cache(CACHE_NAME).query(qry));
+            }
+
+            for (int i = 0; i < KEYS_COUNT; i++) {
+                log.info("Put key: " + i);
+
+                grid(i % GRID_COUNT).cache(CACHE_NAME).put(i, new byte[1024 * 50]);
+            }
+
+            int size = backupQueueSize();
+
+            assertTrue(size > 0);
+            assertTrue(size <= BACKUP_ACK_THRESHOLD * QUERY_COUNT * /* partition count
*/1024);
+
+            stopGrid(GRID_COUNT);
+
+            awaitPartitionMapExchange();
+
+            for (int i = 0; i < KEYS_COUNT; i++) {
+                log.info("Put key: " + i);
+
+                grid(i % GRID_COUNT).cache(CACHE_NAME).put(i, new byte[1024 * 50]);
+            }
+
+            size = backupQueueSize();
+
+            assertEquals(-1, size);
+        }
+        finally {
+            stopGrid(GRID_COUNT);
+        }
+    }
+
+    /**
+     * @return Backup queue size or {@code -1} if backup queue doesn't exist.
+     */
+    private int backupQueueSize() {
+        int backupQueueSize = -1;
+
+        for (int i = 0; i < GRID_COUNT; i++) {
+            for (Collection<Object> backQueue : backupQueues(grid(i)))
+                backupQueueSize += backQueue.size();
+        }
+
+        return backupQueueSize;
+    }
+
+    /**
+     * @param ignite Ignite.
+     * @return Backup queue for test query.
+     */
+    private List<Collection<Object>> backupQueues(Ignite ignite) {
+        GridContinuousProcessor proc = ((IgniteKernal)ignite).context().continuous();
+
+        Map<Object, Object> infos = new HashMap<>();
+
+        Map<Object, Object> rmtInfos = GridTestUtils.getFieldValue(proc, "rmtInfos");
+        Map<Object, Object> locInfos = GridTestUtils.getFieldValue(proc, "locInfos");
+
+        infos.putAll(rmtInfos);
+        infos.putAll(locInfos);
+
+        List<Collection<Object>> backupQueues = new ArrayList<>();
+
+        for (Object info : infos.values()) {
+            GridContinuousHandler hnd = GridTestUtils.getFieldValue(info, "hnd");
+
+            if (hnd.isQuery() && hnd.cacheName().equals(CACHE_NAME)) {
+                Collection<Object> q = GridTestUtils.getFieldValue(hnd,
+                    CacheContinuousQueryHandler.class, "backupQueue");
+
+                if (q != null)
+                    backupQueues.add(q);
+            }
+        }
+
+        return backupQueues;
+    }
+
+    /**
      *
      */
-    private static class FilterFactory implements Factory<CacheEntryEventFilter<Object,
Object>> {
+    private static class AlwaysFalseFilterFactory implements Factory<CacheEntryEventFilter<Object,
Object>> {
         /** {@inheritDoc} */
         @Override public CacheEntryEventFilter<Object, Object> create() {
-            return new CacheEventFilter();
+            return new AlwaysFalseFilter();
         }
     }
 
     /**
      *
      */
-    private static class CacheEventFilter implements CacheEntryEventFilter<Object, Object>,
Serializable {
+    private static class AlwaysFalseFilter implements CacheEntryEventFilter<Object, Object>,
Serializable {
         /** {@inheritDoc} */
         @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) {
             return false;


Mime
View raw message