ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [20/51] ignite git commit: Continuous queries fixes: - flush backup queue on exchange end (otherwise we don't really wait for all current operations) - on coordinator apply counters after all single messages received (otherwise extra counter increments a
Date Wed, 31 May 2017 09:23:10 GMT
Continuous queries fixes:
- flush backup queue on exchange end (otherwise we don't really wait for all current operations)
- on coordinator apply counters after all single messages received (otherwise extra counter increments are possible)
- do not send info about filtered entries if do not have non-filtered entry
- added system properties for hardcoded constants


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/42293fac
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/42293fac
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/42293fac

Branch: refs/heads/ignite-5075-pds
Commit: 42293fac88c29544b7c55c0340224afbf474a301
Parents: 827b7f6
Author: sboikov <sboikov@gridgain.com>
Authored: Mon May 29 16:41:23 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon May 29 16:41:23 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheIoManager.java    |   6 +-
 .../processors/cache/GridCacheMapEntry.java     |   5 +-
 .../GridCachePartitionExchangeManager.java      |   2 +-
 .../dht/GridClientPartitionTopology.java        |  31 +-
 .../dht/GridDhtPartitionTopology.java           |   9 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |  59 +-
 .../GridDhtPartitionsExchangeFuture.java        |  51 +-
 .../CacheContinuousQueryAcknowledgeBuffer.java  | 120 +++
 .../CacheContinuousQueryDeployableObject.java   | 110 +++
 .../continuous/CacheContinuousQueryEntry.java   | 117 ++-
 .../CacheContinuousQueryEventBuffer.java        | 483 ++++++++++++
 .../continuous/CacheContinuousQueryHandler.java | 733 +++----------------
 .../CacheContinuousQueryHandlerV2.java          |   6 +-
 .../continuous/CacheContinuousQueryManager.java |  16 +-
 .../CacheContinuousQueryPartitionRecovery.java  | 267 +++++++
 .../continuous/GridContinuousBatchAdapter.java  |   2 +-
 .../continuous/GridContinuousProcessor.java     |  19 +-
 .../continuous/GridContinuousQueryBatch.java    |  16 +-
 ...tinuousQueryAsyncFailoverAtomicSelfTest.java |   1 -
 ...nuousQueryConcurrentPartitionUpdateTest.java | 304 ++++++++
 .../CacheContinuousQueryEventBufferTest.java    | 217 ++++++
 ...ContinuousQueryFailoverAbstractSelfTest.java |  79 +-
 ...niteCacheContinuousQueryBackupQueueTest.java |  13 +-
 ...eCacheContinuousQueryImmutableEntryTest.java |   6 +-
 .../IgniteCacheQuerySelfTestSuite3.java         |   5 +
 25 files changed, 1885 insertions(+), 792 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index fdd29e4..bb31645 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -874,7 +874,11 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
         finally {
             // Reset thread local context.
             cctx.tm().resetContext();
-            cctx.mvcc().contextReset();
+
+            GridCacheMvccManager mvcc = cctx.mvcc();
+
+            if (mvcc != null)
+                mvcc.contextReset();
 
             // Unwind eviction notifications.
             if (msg instanceof IgniteTxStateAware) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 4f87658..7c7fc99 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -29,7 +28,6 @@ import javax.cache.Cache;
 import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.processor.EntryProcessor;
 import javax.cache.processor.EntryProcessorResult;
-
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -62,8 +60,8 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConfl
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
 import org.apache.ignite.internal.processors.dr.GridDrType;
-import org.apache.ignite.internal.util.IgniteTree;
 import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
+import org.apache.ignite.internal.util.IgniteTree;
 import org.apache.ignite.internal.util.lang.GridClosureException;
 import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter;
 import org.apache.ignite.internal.util.lang.GridTuple;
@@ -76,7 +74,6 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_EXPIRED;

http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 5314088..2eec8f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1312,7 +1312,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                         top = cacheCtx.topology();
 
                     if (top != null) {
-                        updated |= top.update(null, entry.getValue(), null) != null;
+                        updated |= top.update(null, entry.getValue()) != null;
 
                         cctx.affinity().checkRebalanceState(top, cacheId);
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 1de64c5..43bc609 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -650,11 +650,29 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public void applyUpdateCounters(Map<Integer, T2<Long, Long>> cntrMap) {
+        assert cntrMap != null;
+
+        lock.writeLock().lock();
+
+        try {
+            for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) {
+                T2<Long, Long> cntr = this.cntrMap.get(e.getKey());
+
+                if (cntr == null || cntr.get2() < e.getValue().get2())
+                    this.cntrMap.put(e.getKey(), e.getValue());
+            }
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
     @Nullable @Override public GridDhtPartitionMap update(
         @Nullable GridDhtPartitionExchangeId exchId,
-        GridDhtPartitionMap parts,
-        Map<Integer, T2<Long, Long>> cntrMap
+        GridDhtPartitionMap parts
     ) {
         if (log.isDebugEnabled())
             log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']');
@@ -733,15 +751,6 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
                 }
             }
 
-            if (cntrMap != null) {
-                for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) {
-                    T2<Long, Long> cntr = this.cntrMap.get(e.getKey());
-
-                    if (cntr == null || cntr.get2() < e.getValue().get2())
-                        this.cntrMap.put(e.getKey(), e.getValue());
-                }
-            }
-
             consistencyCheck();
 
             if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index f9fd852..ffc1d63 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -234,12 +234,15 @@ public interface GridDhtPartitionTopology {
     /**
      * @param exchId Exchange ID.
      * @param parts Partitions.
-     * @param cntrMap Partition update counters.
      * @return Local partition map if there were evictions or {@code null} otherwise.
      */
     @Nullable public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId,
-        GridDhtPartitionMap parts,
-        @Nullable Map<Integer, T2<Long, Long>> cntrMap);
+        GridDhtPartitionMap parts);
+
+    /**
+     * @param cntrMap Counters map.
+     */
+    public void applyUpdateCounters(Map<Integer, T2<Long, Long>> cntrMap);
 
     /**
      * Checks if there is at least one owner for each partition in the cache topology.

http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 8e79eda..7adce6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -1256,11 +1256,45 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public void applyUpdateCounters(Map<Integer, T2<Long, Long>> cntrMap) {
+        assert cntrMap != null;
+
+        lock.writeLock().lock();
+
+        try {
+            if (stopping)
+                return;
+
+            for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) {
+                T2<Long, Long> cntr = this.cntrMap.get(e.getKey());
+
+                if (cntr == null || cntr.get2() < e.getValue().get2())
+                    this.cntrMap.put(e.getKey(), e.getValue());
+            }
+
+            for (int i = 0; i < locParts.length(); i++) {
+                GridDhtLocalPartition part = locParts.get(i);
+
+                if (part == null)
+                    continue;
+
+                T2<Long, Long> cntr = cntrMap.get(part.id());
+
+                if (cntr != null && cntr.get2() > part.updateCounter())
+                    part.updateCounter(cntr.get2());
+            }
+        }
+        finally {
+            lock.writeLock().unlock();
+
+        }
+    }
+
+    /** {@inheritDoc} */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
     @Nullable @Override public GridDhtPartitionMap update(
         @Nullable GridDhtPartitionExchangeId exchId,
-        GridDhtPartitionMap parts,
-        @Nullable Map<Integer, T2<Long, Long>> cntrMap
+        GridDhtPartitionMap parts
     ) {
         if (log.isDebugEnabled())
             log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']');
@@ -1279,27 +1313,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             if (stopping)
                 return null;
 
-            if (cntrMap != null) {
-                for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) {
-                    T2<Long, Long> cntr = this.cntrMap.get(e.getKey());
-
-                    if (cntr == null || cntr.get2() < e.getValue().get2())
-                        this.cntrMap.put(e.getKey(), e.getValue());
-                }
-
-                for (int i = 0; i < locParts.length(); i++) {
-                    GridDhtLocalPartition part = locParts.get(i);
-
-                    if (part == null)
-                        continue;
-
-                    T2<Long, Long> cntr = cntrMap.get(part.id());
-
-                    if (cntr != null && cntr.get2() > part.updateCounter())
-                        part.updateCounter(cntr.get2());
-                }
-            }
-
             if (lastExchangeId != null && exchId != null && lastExchangeId.compareTo(exchId) > 0) {
                 if (log.isDebugEnabled())
                     log.debug("Stale exchange id for single partition map update (will ignore) [lastExchId=" +

http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 544f847..72c5bbc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -34,7 +34,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.IgniteNeedReconnectException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.PartitionLossPolicy;
@@ -47,18 +46,19 @@ import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.IgniteNeedReconnectException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
-import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.pagemem.snapshot.StartFullSnapshotAckDiscoveryMessage;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
 import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
 import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
+import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
 import org.apache.ignite.internal.processors.cache.ClusterState;
 import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
-import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.ExchangeActions;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -789,14 +789,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
         boolean topChanged = discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT || affChangeMsg != null;
 
-        //todo check
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
             if (cacheCtx.isLocal() || stopping(cacheCtx.cacheId()))
                 continue;
 
             if (topChanged) {
-                cacheCtx.continuousQueries().beforeExchange(exchId.topologyVersion());
-
                 // Partition release future is done so we can flush the write-behind store.
                 cacheCtx.store().forceFlush();
             }
@@ -1101,10 +1098,31 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         }
     }
 
+    /**
+     * @return {@code True} if exchange triggered by server node join or fail.
+     */
+    private boolean serverNodeDiscoveryEvent() {
+        assert discoEvt != null;
+
+        return discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT && !CU.clientNode(discoEvt.eventNode());
+    }
+
     /** {@inheritDoc} */
     @Override public boolean onDone(@Nullable AffinityTopologyVersion res, @Nullable Throwable err) {
         boolean realExchange = !dummy && !forcePreload;
 
+        if (err == null &&
+            realExchange &&
+            !cctx.kernalContext().clientNode() &&
+            (serverNodeDiscoveryEvent() || affChangeMsg != null)) {
+            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                if (!cacheCtx.affinityNode() || cacheCtx.isLocal())
+                    continue;
+
+                cacheCtx.continuousQueries().flushBackupQueue(exchId.topologyVersion());
+            }
+       }
+
         if (err == null && realExchange) {
             for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
                 if (cacheCtx.isLocal())
@@ -1554,6 +1572,25 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 }
             }
 
+            for (GridDhtPartitionsAbstractMessage msg : msgs.values()) {
+                if (msg instanceof GridDhtPartitionsSingleMessage) {
+                    GridDhtPartitionsSingleMessage msg0 = (GridDhtPartitionsSingleMessage)msg;
+
+                    for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg0.partitions().entrySet()) {
+                        Integer cacheId = entry.getKey();
+                        GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+
+                        GridDhtPartitionTopology top = cacheCtx != null ? cacheCtx.topology() :
+                            cctx.exchange().clientTopology(cacheId, this);
+
+                        Map<Integer, T2<Long, Long>> cntrs = msg0.partitionUpdateCounters(cacheId);
+
+                        if (cntrs != null)
+                            top.applyUpdateCounters(cntrs);
+                    }
+                }
+            }
+
             if (discoEvt.type() == EVT_NODE_JOINED) {
                 if (cctx.kernalContext().state().active())
                     assignPartitionsStates();
@@ -1785,7 +1822,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             GridDhtPartitionTopology top = cacheCtx != null ? cacheCtx.topology() :
                 cctx.exchange().clientTopology(cacheId, this);
 
-            top.update(exchId, entry.getValue(), msg.partitionUpdateCounters(cacheId));
+            top.update(exchId, entry.getValue());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAcknowledgeBuffer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAcknowledgeBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAcknowledgeBuffer.java
new file mode 100644
index 0000000..c95dc42
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAcknowledgeBuffer.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
+import org.apache.ignite.internal.processors.continuous.GridContinuousQueryBatch;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+/** */
+class CacheContinuousQueryAcknowledgeBuffer {
+    /** */
+    private int size;
+
+    /** */
+    @GridToStringInclude
+    private Map<Integer, Long> updateCntrs = new HashMap<>();
+
+    /** */
+    @GridToStringInclude
+    private Set<AffinityTopologyVersion> topVers = U.newHashSet(1);
+
+    /**
+     * @param batch Batch.
+     * @return Non-null tuple if acknowledge should be sent to backups.
+     */
+    @SuppressWarnings("unchecked")
+    @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>onAcknowledged(
+        GridContinuousBatch batch) {
+        assert batch instanceof GridContinuousQueryBatch;
+
+        size += ((GridContinuousQueryBatch)batch).entriesCount();
+
+        Collection<CacheContinuousQueryEntry> entries = (Collection)batch.collect();
+
+        for (CacheContinuousQueryEntry e : entries)
+            addEntry(e);
+
+        return size >= CacheContinuousQueryHandler.BACKUP_ACK_THRESHOLD ? acknowledgeData() : null;
+    }
+
+    /**
+     * @param e Entry.
+     * @return Non-null tuple if acknowledge should be sent to backups.
+     */
+    @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>
+    onAcknowledged(CacheContinuousQueryEntry e) {
+        size++;
+
+        addEntry(e);
+
+        return size >= CacheContinuousQueryHandler.BACKUP_ACK_THRESHOLD ? acknowledgeData() : null;
+    }
+
+    /**
+     * @param e Entry.
+     */
+    private void addEntry(CacheContinuousQueryEntry e) {
+        topVers.add(e.topologyVersion());
+
+        Long cntr0 = updateCntrs.get(e.partition());
+
+        if (cntr0 == null || e.updateCounter() > cntr0)
+            updateCntrs.put(e.partition(), e.updateCounter());
+    }
+
+    /**
+     * @return Non-null tuple if acknowledge should be sent to backups.
+     */
+    @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>
+        acknowledgeOnTimeout() {
+        return size > 0 ? acknowledgeData() : null;
+    }
+
+    /**
+     * @return Tuple with acknowledge information.
+     */
+    private IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> acknowledgeData() {
+        assert size > 0;
+
+        Map<Integer, Long> cntrs = new HashMap<>(updateCntrs);
+
+        IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> res =
+            new IgniteBiTuple<>(cntrs, topVers);
+
+        topVers = U.newHashSet(1);
+
+        size = 0;
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CacheContinuousQueryAcknowledgeBuffer.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeployableObject.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeployableObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeployableObject.java
new file mode 100644
index 0000000..f888467
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeployableObject.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteDeploymentCheckedException;
+import org.apache.ignite.internal.managers.deployment.GridDeployment;
+import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
+import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Deployable object.
+ */
+class CacheContinuousQueryDeployableObject implements Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Serialized object. */
+    private byte[] bytes;
+
+    /** Deployment class name. */
+    private String clsName;
+
+    /** Deployment info. */
+    private GridDeploymentInfo depInfo;
+
+    /**
+     * Required by {@link Externalizable}.
+     */
+    public CacheContinuousQueryDeployableObject() {
+        // No-op.
+    }
+
+    /**
+     * @param obj Object.
+     * @param ctx Kernal context.
+     * @throws IgniteCheckedException In case of error.
+     */
+    protected CacheContinuousQueryDeployableObject(Object obj, GridKernalContext ctx) throws IgniteCheckedException {
+        assert obj != null;
+        assert ctx != null;
+
+        Class cls = U.detectClass(obj);
+
+        clsName = cls.getName();
+
+        GridDeployment dep = ctx.deploy().deploy(cls, U.detectClassLoader(cls));
+
+        if (dep == null)
+            throw new IgniteDeploymentCheckedException("Failed to deploy object: " + obj);
+
+        depInfo = new GridDeploymentInfoBean(dep);
+
+        bytes = U.marshal(ctx, obj);
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param ctx Kernal context.
+     * @return Deserialized object.
+     * @throws IgniteCheckedException In case of error.
+     */
+    <T> T unmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException {
+        assert ctx != null;
+
+        GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName,
+            depInfo.userVersion(), nodeId, depInfo.classLoaderId(), depInfo.participants(), null);
+
+        if (dep == null)
+            throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName);
+
+        return U.unmarshal(ctx, bytes, U.resolveClassLoader(dep.classLoader(), ctx.config()));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        U.writeByteArray(out, bytes);
+        U.writeString(out, clsName);
+        out.writeObject(depInfo);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        bytes = U.readByteArray(in);
+        clsName = U.readString(in);
+        depInfo = (GridDeploymentInfo)in.readObject();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
index bf2a691..7e3f0b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -51,6 +51,9 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
     private static final byte FILTERED_ENTRY = 0b0010;
 
     /** */
+    private static final byte KEEP_BINARY = 0b0100;
+
+    /** */
     private static final EventType[] EVT_TYPE_VALS = EventType.values();
 
     /**
@@ -105,11 +108,8 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
     @GridToStringInclude
     private AffinityTopologyVersion topVer;
 
-    /** Filtered events. */
-    private GridLongList filteredEvts;
-
-    /** Keep binary. */
-    private boolean keepBinary;
+    /** */
+    private long filteredCnt;
 
     /**
      * Required by {@link Message}.
@@ -124,9 +124,11 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
      * @param key Key.
      * @param newVal New value.
      * @param oldVal Old value.
+     * @param keepBinary Keep binary flag.
      * @param part Partition.
      * @param updateCntr Update partition counter.
      * @param topVer Topology version if applicable.
+     * @param flags Flags.
      */
     CacheContinuousQueryEntry(
         int cacheId,
@@ -137,7 +139,8 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
         boolean keepBinary,
         int part,
         long updateCntr,
-        @Nullable AffinityTopologyVersion topVer) {
+        @Nullable AffinityTopologyVersion topVer,
+        byte flags) {
         this.cacheId = cacheId;
         this.evtType = evtType;
         this.key = key;
@@ -146,7 +149,17 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
         this.part = part;
         this.updateCntr = updateCntr;
         this.topVer = topVer;
-        this.keepBinary = keepBinary;
+        this.flags = flags;
+
+        if (keepBinary)
+            this.flags |= KEEP_BINARY;
+    }
+
+    /**
+     * @return Flags.
+     */
+    public byte flags() {
+        return flags;
     }
 
     /**
@@ -207,26 +220,40 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
     }
 
     /**
-     * @return Size include this event and filtered.
+     * @param filteredCnt Number of entries filtered before this entry.
+     */
+    void filteredCount(long filteredCnt) {
+        assert filteredCnt >= 0 : filteredCnt;
+
+        this.filteredCnt = filteredCnt;
+    }
+
+    /**
+     * @return Number of entries filtered before this entry.
      */
-    public int size() {
-        return filteredEvts != null ? filteredEvts.size() + 1 : 1;
+    long filteredCount() {
+        return filteredCnt;
     }
 
     /**
      * @return If entry filtered then will return light-weight <i><b>new entry</b></i> without values and key
      * (avoid to huge memory consumption), otherwise {@code this}.
      */
-    CacheContinuousQueryEntry forBackupQueue() {
+    CacheContinuousQueryEntry copyWithDataReset() {
         if (!isFiltered())
             return this;
 
-        CacheContinuousQueryEntry e = new CacheContinuousQueryEntry(
-                cacheId, null, null, null, null, keepBinary, part, updateCntr, topVer);
-
-        e.flags = flags;
-
-        return e;
+        return new CacheContinuousQueryEntry(
+            cacheId,
+            null,
+            null,
+            null,
+            null,
+            false,
+            part,
+            updateCntr,
+            topVer,
+            flags);
     }
 
     /**
@@ -247,21 +274,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
      * @return Keep binary flag.
      */
     boolean isKeepBinary() {
-        return keepBinary;
-    }
-
-    /**
-     * @param cntrs Filtered events.
-     */
-    void filteredEvents(GridLongList cntrs) {
-        filteredEvts = cntrs;
-    }
-
-    /**
-     * @return previous filtered events.
-     */
-    long[] filteredEvents() {
-        return filteredEvts == null ? null : filteredEvts.array();
+        return (flags & KEEP_BINARY) != 0;
     }
 
     /**
@@ -363,7 +376,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
                 writer.incrementState();
 
             case 2:
-                if (!writer.writeMessage("filteredEvts", filteredEvts))
+                if (!writer.writeLong("filteredCnt", filteredCnt))
                     return false;
 
                 writer.incrementState();
@@ -375,42 +388,36 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeBoolean("keepBinary", keepBinary))
-                    return false;
-
-                writer.incrementState();
-
-            case 5:
                 if (!writer.writeMessage("key", isFiltered() ? null : key))
                     return false;
 
                 writer.incrementState();
 
-            case 6:
+            case 5:
                 if (!writer.writeMessage("newVal", isFiltered() ? null : newVal))
                     return false;
 
                 writer.incrementState();
 
-            case 7:
+            case 6:
                 if (!writer.writeMessage("oldVal", isFiltered() ? null : oldVal))
                     return false;
 
                 writer.incrementState();
 
-            case 8:
+            case 7:
                 if (!writer.writeInt("part", part))
                     return false;
 
                 writer.incrementState();
 
-            case 9:
+            case 8:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
                 writer.incrementState();
 
-            case 10:
+            case 9:
                 if (!writer.writeLong("updateCntr", updateCntr))
                     return false;
 
@@ -446,7 +453,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
                 reader.incrementState();
 
             case 2:
-                filteredEvts = reader.readMessage("filteredEvts");
+                filteredCnt = reader.readLong("filteredCnt");
 
                 if (!reader.isLastRead())
                     return false;
@@ -462,14 +469,6 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
                 reader.incrementState();
 
             case 4:
-                keepBinary = reader.readBoolean("keepBinary");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 5:
                 key = reader.readMessage("key");
 
                 if (!reader.isLastRead())
@@ -477,7 +476,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
 
                 reader.incrementState();
 
-            case 6:
+            case 5:
                 newVal = reader.readMessage("newVal");
 
                 if (!reader.isLastRead())
@@ -485,7 +484,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
 
                 reader.incrementState();
 
-            case 7:
+            case 6:
                 oldVal = reader.readMessage("oldVal");
 
                 if (!reader.isLastRead())
@@ -493,7 +492,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
 
                 reader.incrementState();
 
-            case 8:
+            case 7:
                 part = reader.readInt("part");
 
                 if (!reader.isLastRead())
@@ -501,7 +500,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
 
                 reader.incrementState();
 
-            case 9:
+            case 8:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -509,7 +508,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
 
                 reader.incrementState();
 
-            case 10:
+            case 9:
                 updateCntr = reader.readLong("updateCntr");
 
                 if (!reader.isLastRead())
@@ -524,7 +523,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 11;
+        return 10;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
new file mode 100644
index 0000000..336f650
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentLinkedDeque8;
+
+/**
+ *
+ */
+public class CacheContinuousQueryEventBuffer {
+    /** */
+    private static final int BUF_SIZE =
+        IgniteSystemProperties.getInteger("IGNITE_CONTINUOUS_QUERY_SERVER_BUFFER_SIZE", 1000);
+
+    /** */
+    private static final Object RETRY = new Object();
+
+    /** */
+    protected final int part;
+
+    /** */
+    private AtomicReference<Batch> curBatch = new AtomicReference<>();
+
+    /** */
+    private ConcurrentLinkedDeque8<CacheContinuousQueryEntry> backupQ = new ConcurrentLinkedDeque8<>();
+
+    /** */
+    private ConcurrentSkipListMap<Long, CacheContinuousQueryEntry> pending = new ConcurrentSkipListMap<>();
+
+    /**
+     * @param part Partition number.
+     */
+    CacheContinuousQueryEventBuffer(int part) {
+        this.part = part;
+    }
+
+    /**
+     * @param updateCntr Acknowledged counter.
+     */
+    void cleanupBackupQueue(Long updateCntr) {
+        Iterator<CacheContinuousQueryEntry> it = backupQ.iterator();
+
+        while (it.hasNext()) {
+            CacheContinuousQueryEntry backupEntry = it.next();
+
+            if (backupEntry.updateCounter() <= updateCntr)
+                it.remove();
+        }
+    }
+
+    /**
+     * @return Backup entries.
+     */
+    @Nullable Collection<CacheContinuousQueryEntry> flushOnExchange() {
+        TreeMap<Long, CacheContinuousQueryEntry> ret = null;
+
+        int size = backupQ.sizex();
+
+        if (size > 0) {
+            ret = new TreeMap<>();
+
+            for (int i = 0; i < size; i++) {
+                CacheContinuousQueryEntry e = backupQ.pollFirst();
+
+                if (e != null)
+                    ret.put(e.updateCounter(), e);
+                else
+                    break;
+            }
+        }
+
+        Batch batch = curBatch.get();
+
+        if (batch != null)
+            ret = batch.flushCurrentEntries(ret);
+
+        if (!pending.isEmpty()) {
+            if (ret == null)
+                ret = new TreeMap<>();
+
+            for (CacheContinuousQueryEntry e : pending.values())
+                ret.put(e.updateCounter(), e);
+        }
+
+        return ret != null ? ret.values() : null;
+    }
+
+    /**
+     * @return Initial partition counter.
+     */
+    protected long currentPartitionCounter() {
+        return 0;
+    }
+
+    /**
+     * For test purpose only.
+     *
+     * @return Current number of filtered events.
+     */
+    long currentFiltered() {
+        Batch batch = curBatch.get();
+
+        return batch != null ? batch.filtered : 0;
+    }
+
+    /**
+     * @param e Entry to process.
+     * @param backup Backup entry flag.
+     * @return Collected entries to pass to listener (single entry or entries list).
+     */
+    @Nullable Object processEntry(CacheContinuousQueryEntry e, boolean backup) {
+        return process0(e.updateCounter(), e, backup);
+    }
+
+    /**
+     * @param backup Backup entry flag.
+     * @param cntr Entry counter.
+     * @param entry Entry.
+     * @return Collected entries.
+     */
+    private Object process0(long cntr, CacheContinuousQueryEntry entry, boolean backup) {
+        assert cntr >= 0 : cntr;
+
+        Batch batch;
+        Object res = null;
+
+        for (;;) {
+            batch = initBatch(entry.topologyVersion());
+
+            if (batch == null || cntr < batch.startCntr) {
+                if (backup)
+                    backupQ.add(entry);
+
+                return entry;
+            }
+
+            if (cntr <= batch.endCntr) {
+                res = batch.processEntry0(null, cntr, entry, backup);
+
+                if (res == RETRY)
+                    continue;
+            }
+            else
+                pending.put(cntr, entry);
+
+            break;
+        }
+
+        Batch batch0 = curBatch.get();
+
+        if (batch0 != batch) {
+            do {
+                batch = batch0;
+
+                res = processPending(res, batch, backup);
+
+                batch0 = initBatch(entry.topologyVersion());
+            }
+            while (batch != batch0);
+        }
+
+        return res;
+    }
+
+    /**
+     * @param topVer Current event topology version.
+     * @return Current batch.
+     */
+    @Nullable private Batch initBatch(AffinityTopologyVersion topVer) {
+        Batch batch = curBatch.get();
+
+        if (batch != null)
+            return batch;
+
+        for (;;) {
+            long curCntr = currentPartitionCounter();
+
+            if (curCntr == -1)
+                return null;
+
+            batch = new Batch(curCntr + 1, 0L, new CacheContinuousQueryEntry[BUF_SIZE], topVer);
+
+            if (curBatch.compareAndSet(null, batch))
+                return batch;
+
+            batch = curBatch.get();
+
+            if (batch != null)
+                return batch;
+        }
+    }
+
+    /**
+     * @param res Current result.
+     * @param batch Current batch.
+     * @param backup Backup entry flag.
+     * @return New result.
+     */
+    @Nullable private Object processPending(@Nullable Object res, Batch batch, boolean backup) {
+        if (pending.floorKey(batch.endCntr) != null) {
+            for (Map.Entry<Long, CacheContinuousQueryEntry> p : pending.headMap(batch.endCntr, true).entrySet()) {
+                long cntr = p.getKey();
+
+                assert cntr <= batch.endCntr;
+
+                if (pending.remove(p.getKey()) != null) {
+                    if (cntr < batch.startCntr)
+                        res = addResult(res, p.getValue(), backup);
+                    else
+                        res = batch.processEntry0(res, p.getKey(), p.getValue(), backup);
+                }
+            }
+        }
+
+        return res;
+    }
+
+    /**
+     * @param res Current result.
+     * @param entry Entry to add.
+     * @param backup Backup entry flag.
+     * @return Updated result.
+     */
+    @Nullable private Object addResult(@Nullable Object res, CacheContinuousQueryEntry entry, boolean backup) {
+        if (res == null) {
+            if (backup)
+                backupQ.add(entry);
+            else
+                res = entry;
+        }
+        else {
+            assert !backup;
+
+            List<CacheContinuousQueryEntry> resList;
+
+            if (res instanceof CacheContinuousQueryEntry) {
+                resList = new ArrayList<>();
+
+                resList.add((CacheContinuousQueryEntry)res);
+            }
+            else {
+                assert res instanceof List : res;
+
+                resList = (List<CacheContinuousQueryEntry>)res;
+            }
+
+            resList.add(entry);
+
+            res = resList;
+        }
+
+        return res;
+    }
+
+    /**
+     *
+     */
+    private class Batch {
+        /** */
+        private long filtered;
+
+        /** */
+        private final long startCntr;
+
+        /** */
+        private final long endCntr;
+
+        /** */
+        private int lastProc = -1;
+
+        /** */
+        private CacheContinuousQueryEntry[] entries;
+
+        /** */
+        private final AffinityTopologyVersion topVer;
+
+        /**
+         * @param filtered Number of filtered events before this batch.
+         * @param entries Entries array.
+         * @param topVer Current event topology version.
+         * @param startCntr Start counter.
+         */
+        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries, AffinityTopologyVersion topVer) {
+            assert startCntr >= 0;
+            assert filtered >= 0;
+
+            this.startCntr = startCntr;
+            this.filtered = filtered;
+            this.entries = entries;
+            this.topVer = topVer;
+
+            endCntr = startCntr + BUF_SIZE - 1;
+        }
+
+        /**
+         * @param res Current entries.
+         * @return Entries to send as part of backup queue.
+         */
+        @Nullable synchronized TreeMap<Long, CacheContinuousQueryEntry> flushCurrentEntries(
+            @Nullable TreeMap<Long, CacheContinuousQueryEntry> res) {
+            if (entries == null)
+                return res;
+
+            long filtered = this.filtered;
+            long cntr = startCntr;
+
+            for (int i = 0; i < entries.length; i++) {
+                CacheContinuousQueryEntry e = entries[i];
+
+                CacheContinuousQueryEntry flushEntry = null;
+
+                if (e == null) {
+                    if (filtered != 0) {
+                        flushEntry = filteredEntry(cntr - 1, filtered - 1);
+
+                        filtered = 0;
+                    }
+                }
+                else {
+                    if (e.isFiltered())
+                        filtered++;
+                    else {
+                        flushEntry = new CacheContinuousQueryEntry(e.cacheId(),
+                            e.eventType(),
+                            e.key(),
+                            e.value(),
+                            e.oldValue(),
+                            e.isKeepBinary(),
+                            e.partition(),
+                            e.updateCounter(),
+                            e.topologyVersion(),
+                            e.flags());
+
+                        flushEntry.filteredCount(filtered);
+
+                        filtered = 0;
+                    }
+                }
+
+                if (flushEntry != null) {
+                    if (res == null)
+                        res = new TreeMap<>();
+
+                    res.put(flushEntry.updateCounter(), flushEntry);
+                }
+
+                cntr++;
+            }
+
+            if (filtered != 0L) {
+                if (res == null)
+                    res = new TreeMap<>();
+
+                CacheContinuousQueryEntry flushEntry = filteredEntry(cntr - 1, filtered - 1);
+
+                res.put(flushEntry.updateCounter(), flushEntry);
+            }
+
+            return res;
+        }
+
+        /**
+         * @param cntr Entry counter.
+         * @param filtered Number of entries filtered before this entry.
+         * @return Entry.
+         */
+        private CacheContinuousQueryEntry filteredEntry(long cntr, long filtered) {
+            CacheContinuousQueryEntry e = new CacheContinuousQueryEntry(0,
+                null,
+                null,
+                null,
+                null,
+                false,
+                part,
+                cntr,
+                topVer,
+                (byte)0);
+
+            e.markFiltered();
+
+            e.filteredCount(filtered);
+
+            return e;
+        }
+
+        /**
+         * @param res Current result.
+         * @param cntr Entry counter.
+         * @param entry Entry.
+         * @param backup Backup entry flag.
+         * @return New result.
+         */
+        @SuppressWarnings("unchecked")
+        @Nullable private Object processEntry0(
+            @Nullable Object res,
+            long cntr,
+            CacheContinuousQueryEntry entry,
+            boolean backup) {
+            int pos = (int)(cntr - startCntr);
+
+            synchronized (this) {
+                if (entries == null)
+                    return RETRY;
+
+                entry = entry.copyWithDataReset();
+
+                entries[pos] = entry;
+
+                int next = lastProc + 1;
+
+                if (next == pos) {
+                    for (int i = next; i < entries.length; i++) {
+                        CacheContinuousQueryEntry entry0 = entries[i];
+
+                        if (entry0 != null) {
+                            if (!entry0.isFiltered()) {
+                                entry0.filteredCount(filtered);
+
+                                filtered = 0;
+
+                                res = addResult(res, entry0, backup);
+                            }
+                            else
+                                filtered++;
+
+                            pos = i;
+                        }
+                        else
+                            break;
+                    }
+
+                    lastProc = pos;
+
+                    if (pos == entries.length - 1) {
+                        Arrays.fill(entries, null);
+
+                        Batch nextBatch = new Batch(this.startCntr + BUF_SIZE,
+                            filtered,
+                            entries,
+                            entry.topologyVersion());
+
+                        entries = null;
+
+                        assert curBatch.get() == this;
+
+                        curBatch.set(nextBatch);
+                    }
+                }
+                else
+                    return res;
+            }
+
+            return res;
+        }
+    }
+}
\ No newline at end of file


Mime
View raw message