ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [30/39] ignite git commit: cc
Date Fri, 26 May 2017 14:06:30 GMT
cc


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e3500de9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e3500de9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e3500de9

Branch: refs/heads/ignite-5075-cc-debug
Commit: e3500de9f775b091635b06108efc4937f524b6a1
Parents: 01f45c1
Author: sboikov <sboikov@gridgain.com>
Authored: Fri May 26 11:21:25 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri May 26 11:28:35 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     |  6 +-
 .../GridCachePartitionExchangeManager.java      |  2 +-
 .../dht/GridClientPartitionTopology.java        | 31 ++++---
 .../dht/GridDhtPartitionTopology.java           |  9 +-
 .../dht/GridDhtPartitionTopologyImpl.java       | 59 ++++++++-----
 .../GridDhtPartitionsExchangeFuture.java        | 34 ++++++--
 .../CacheContinuousQueryEventBuffer.java        | 91 ++++++++++++--------
 .../continuous/CacheContinuousQueryManager.java |  4 +-
 8 files changed, 151 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e3500de9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 80f872c..87fe18e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -28,7 +28,6 @@ import javax.cache.Cache;
 import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.processor.EntryProcessor;
 import javax.cache.processor.EntryProcessorResult;
-
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -61,8 +60,8 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConfl
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
 import org.apache.ignite.internal.processors.dr.GridDrType;
-import org.apache.ignite.internal.util.IgniteTree;
 import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
+import org.apache.ignite.internal.util.IgniteTree;
 import org.apache.ignite.internal.util.lang.GridClosureException;
 import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter;
 import org.apache.ignite.internal.util.lang.GridTuple;
@@ -75,7 +74,6 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_EXPIRED;
@@ -1838,8 +1836,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
             if (updateRes.success())
                 updateMetrics(c.op, metrics);
 
-            lsnrs = cctx.continuousQueries().updateListeners(internal, false);
-
             // Continuous query filter should be perform under lock.
             if (lsnrs != null) {
                 CacheObject evtVal = cctx.unwrapTemporary(updateVal);

http://git-wip-us.apache.org/repos/asf/ignite/blob/e3500de9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 5314088..2eec8f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1312,7 +1312,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                         top = cacheCtx.topology();
 
                     if (top != null) {
-                        updated |= top.update(null, entry.getValue(), null) != null;
+                        updated |= top.update(null, entry.getValue()) != null;
 
                         cctx.affinity().checkRebalanceState(top, cacheId);
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e3500de9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 1de64c5..43bc609 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -650,11 +650,29 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology
{
     }
 
     /** {@inheritDoc} */
+    @Override public void applyUpdateCounters(Map<Integer, T2<Long, Long>> cntrMap)
{
+        assert cntrMap != null;
+
+        lock.writeLock().lock();
+
+        try {
+            for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) {
+                T2<Long, Long> cntr = this.cntrMap.get(e.getKey());
+
+                if (cntr == null || cntr.get2() < e.getValue().get2())
+                    this.cntrMap.put(e.getKey(), e.getValue());
+            }
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
     @Nullable @Override public GridDhtPartitionMap update(
         @Nullable GridDhtPartitionExchangeId exchId,
-        GridDhtPartitionMap parts,
-        Map<Integer, T2<Long, Long>> cntrMap
+        GridDhtPartitionMap parts
     ) {
         if (log.isDebugEnabled())
             log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts)
+ ']');
@@ -733,15 +751,6 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology
{
                 }
             }
 
-            if (cntrMap != null) {
-                for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet())
{
-                    T2<Long, Long> cntr = this.cntrMap.get(e.getKey());
-
-                    if (cntr == null || cntr.get2() < e.getValue().get2())
-                        this.cntrMap.put(e.getKey(), e.getValue());
-                }
-            }
-
             consistencyCheck();
 
             if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/e3500de9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index f9fd852..ffc1d63 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -234,12 +234,15 @@ public interface GridDhtPartitionTopology {
     /**
      * @param exchId Exchange ID.
      * @param parts Partitions.
-     * @param cntrMap Partition update counters.
      * @return Local partition map if there were evictions or {@code null} otherwise.
      */
     @Nullable public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId,
-        GridDhtPartitionMap parts,
-        @Nullable Map<Integer, T2<Long, Long>> cntrMap);
+        GridDhtPartitionMap parts);
+
+    /**
+     * @param cntrMap Counters map.
+     */
+    public void applyUpdateCounters(Map<Integer, T2<Long, Long>> cntrMap);
 
     /**
      * Checks if there is at least one owner for each partition in the cache topology.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e3500de9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 8e79eda..7adce6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -1256,11 +1256,45 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
     }
 
     /** {@inheritDoc} */
+    @Override public void applyUpdateCounters(Map<Integer, T2<Long, Long>> cntrMap)
{
+        assert cntrMap != null;
+
+        lock.writeLock().lock();
+
+        try {
+            if (stopping)
+                return;
+
+            for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) {
+                T2<Long, Long> cntr = this.cntrMap.get(e.getKey());
+
+                if (cntr == null || cntr.get2() < e.getValue().get2())
+                    this.cntrMap.put(e.getKey(), e.getValue());
+            }
+
+            for (int i = 0; i < locParts.length(); i++) {
+                GridDhtLocalPartition part = locParts.get(i);
+
+                if (part == null)
+                    continue;
+
+                T2<Long, Long> cntr = cntrMap.get(part.id());
+
+                if (cntr != null && cntr.get2() > part.updateCounter())
+                    part.updateCounter(cntr.get2());
+            }
+        }
+        finally {
+            lock.writeLock().unlock();
+
+        }
+    }
+
+    /** {@inheritDoc} */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
     @Nullable @Override public GridDhtPartitionMap update(
         @Nullable GridDhtPartitionExchangeId exchId,
-        GridDhtPartitionMap parts,
-        @Nullable Map<Integer, T2<Long, Long>> cntrMap
+        GridDhtPartitionMap parts
     ) {
         if (log.isDebugEnabled())
             log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts)
+ ']');
@@ -1279,27 +1313,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
             if (stopping)
                 return null;
 
-            if (cntrMap != null) {
-                for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet())
{
-                    T2<Long, Long> cntr = this.cntrMap.get(e.getKey());
-
-                    if (cntr == null || cntr.get2() < e.getValue().get2())
-                        this.cntrMap.put(e.getKey(), e.getValue());
-                }
-
-                for (int i = 0; i < locParts.length(); i++) {
-                    GridDhtLocalPartition part = locParts.get(i);
-
-                    if (part == null)
-                        continue;
-
-                    T2<Long, Long> cntr = cntrMap.get(part.id());
-
-                    if (cntr != null && cntr.get2() > part.updateCounter())
-                        part.updateCounter(cntr.get2());
-                }
-            }
-
             if (lastExchangeId != null && exchId != null && lastExchangeId.compareTo(exchId)
> 0) {
                 if (log.isDebugEnabled())
                     log.debug("Stale exchange id for single partition map update (will ignore)
[lastExchId=" +

http://git-wip-us.apache.org/repos/asf/ignite/blob/e3500de9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 8b8c87c..dfea951 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -34,7 +34,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.IgniteNeedReconnectException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.PartitionLossPolicy;
@@ -47,18 +46,19 @@ import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.IgniteNeedReconnectException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
-import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.pagemem.snapshot.StartFullSnapshotAckDiscoveryMessage;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
 import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
 import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
+import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
 import org.apache.ignite.internal.processors.cache.ClusterState;
 import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
-import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.ExchangeActions;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -1098,6 +1098,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         }
     }
 
+    /**
+     * @return {@code True} if exchange triggered by server node join or fail.
+     */
     private boolean serverNotDiscoveryEvent() {
         return discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT && !CU.clientNode(discoEvt.eventNode());
     }
@@ -1108,10 +1111,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
         if (realExchange && !cctx.kernalContext().clientNode() && (serverNotDiscoveryEvent()
|| affChangeMsg != null)) {
             for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                if (cacheCtx.isLocal())
+                if (!cacheCtx.affinityNode() || cacheCtx.isLocal())
                     continue;
 
-                cacheCtx.continuousQueries().beforeExchange(exchId.topologyVersion());
+                cacheCtx.continuousQueries().flushBackupQueue(exchId.topologyVersion());
             }
        }
 
@@ -1564,6 +1567,25 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 }
             }
 
+            for (GridDhtPartitionsAbstractMessage msg : msgs.values()) {
+                if (msg instanceof GridDhtPartitionsSingleMessage) {
+                    GridDhtPartitionsSingleMessage msg0 = (GridDhtPartitionsSingleMessage)msg;
+
+                    for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg0.partitions().entrySet())
{
+                        Integer cacheId = entry.getKey();
+                        GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+
+                        GridDhtPartitionTopology top = cacheCtx != null ? cacheCtx.topology()
:
+                            cctx.exchange().clientTopology(cacheId, this);
+
+                        Map<Integer, T2<Long, Long>> cntrs = msg0.partitionUpdateCounters(cacheId);
+
+                        if (cntrs != null)
+                            top.applyUpdateCounters(cntrs);
+                    }
+                }
+            }
+
             if (discoEvt.type() == EVT_NODE_JOINED) {
                 if (cctx.kernalContext().state().active())
                     assignPartitionsStates();
@@ -1795,7 +1817,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             GridDhtPartitionTopology top = cacheCtx != null ? cacheCtx.topology() :
                 cctx.exchange().clientTopology(cacheId, this);
 
-            top.update(exchId, entry.getValue(), msg.partitionUpdateCounters(cacheId));
+            top.update(exchId, entry.getValue());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e3500de9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
index c59b851..a072240 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
@@ -112,8 +112,9 @@ public class CacheContinuousQueryEventBuffer {
                 ret.addAll(pending.values());
             }
 
-            if (curBatch.compareAndSet(batch, null))
-                break;
+            break;
+//            if (curBatch.compareAndSet(batch, null))
+//                break;
         }
 
         return ret;
@@ -235,10 +236,14 @@ public class CacheContinuousQueryEventBuffer {
             for (Map.Entry<Long, CacheContinuousQueryEntry> p : pending.headMap(batch.endCntr,
true).entrySet()) {
                 long cntr = p.getKey();
 
-                assert cntr >= batch.startCntr && cntr <= batch.endCntr : cntr;
+                assert cntr <= batch.endCntr;
 
-                if (pending.remove(p.getKey()) != null)
-                    res = batch.processEntry0(res, p.getKey(), p.getValue(), backup);
+                if (pending.remove(p.getKey()) != null) {
+                    if (cntr < batch.startCntr)
+                        res = addResult(res, p.getValue(), backup);
+                    else
+                        res = batch.processEntry0(res, p.getKey(), p.getValue(), backup);
+                }
             }
         }
 
@@ -246,6 +251,43 @@ public class CacheContinuousQueryEventBuffer {
     }
 
     /**
+     * @param res Current result.
+     * @param entry Entry to add.
+     * @param backup Backup entry flag.
+     * @return Updated result.
+     */
+    @Nullable private Object addResult(@Nullable Object res, CacheContinuousQueryEntry entry,
boolean backup) {
+        if (res == null) {
+            if (backup)
+                backupQ.add(entry);
+            else
+                res = entry;
+        }
+        else {
+            assert !backup;
+
+            List<CacheContinuousQueryEntry> resList;
+
+            if (res instanceof CacheContinuousQueryEntry) {
+                resList = new ArrayList<>();
+
+                resList.add((CacheContinuousQueryEntry)res);
+            }
+            else {
+                assert res instanceof List : res;
+
+                resList = (List<CacheContinuousQueryEntry>)res;
+            }
+
+            resList.add(entry);
+
+            res = resList;
+        }
+
+        return res;
+    }
+
+    /**
      *
      */
     private class Batch {
@@ -313,7 +355,15 @@ public class CacheContinuousQueryEventBuffer {
                     if (e.isFiltered())
                         filtered++;
                     else {
-                        flushEntry = e;
+                        flushEntry = new CacheContinuousQueryEntry(e.cacheId(),
+                            e.eventType(),
+                            e.key(),
+                            e.value(),
+                            e.oldValue(),
+                            e.isKeepBinary(),
+                            e.partition(),
+                            e.updateCounter(),
+                            e.topologyVersion());
 
                         flushEntry.filteredCount(filtered);
 
@@ -338,8 +388,6 @@ public class CacheContinuousQueryEventBuffer {
                 res.add(filteredEntry(cntr - 1, filtered - 1));
             }
 
-            entries = null;
-
             return res;
         }
 
@@ -399,32 +447,7 @@ public class CacheContinuousQueryEventBuffer {
 
                                 filtered = 0;
 
-                                if (res == null) {
-                                    if (backup)
-                                        backupQ.add(entry0);
-                                    else
-                                        res = entry0;
-                                }
-                                else {
-                                    assert !backup;
-
-                                    List<CacheContinuousQueryEntry> resList;
-
-                                    if (res instanceof CacheContinuousQueryEntry) {
-                                        resList = new ArrayList<>();
-
-                                        resList.add((CacheContinuousQueryEntry)res);
-                                    }
-                                    else {
-                                        assert res instanceof List : res;
-
-                                        resList = (List<CacheContinuousQueryEntry>)res;
-                                    }
-
-                                    resList.add(entry0);
-
-                                    res = resList;
-                                }
+                                res = addResult(res, entry0, backup);
                             }
                             else
                                 filtered++;

http://git-wip-us.apache.org/repos/asf/ignite/blob/e3500de9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index acf351f..7cbb1e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -568,9 +568,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
     }
 
     /**
-     * @param topVer Topology version.
+     * @param topVer Finished exchange topology version.
      */
-    public void beforeExchange(AffinityTopologyVersion topVer) {
+    public void flushBackupQueue(AffinityTopologyVersion topVer) {
         for (CacheContinuousQueryListener lsnr : lsnrs.values())
             lsnr.flushBackupQueue(cctx.kernalContext(), topVer);
     }


Mime
View raw message