ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [08/50] [abbrv] ignite git commit: Continuous queries fixes: - flush backup queue on exchange end (otherwise we don't really wait for all current operations) - on coordinator apply counters after all single messages received (otherwise extra counter incr
Date Thu, 01 Jun 2017 13:23:23 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index e7706dd..e5347c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -24,17 +24,12 @@ import java.io.ObjectOutput;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.NavigableSet;
 import java.util.Set;
-import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
 import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryEventFilter;
 import javax.cache.event.CacheEntryUpdatedListener;
@@ -42,23 +37,22 @@ import javax.cache.event.EventType;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.CacheQueryExecutedEvent;
 import org.apache.ignite.events.CacheQueryReadEvent;
 import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.IgniteDeploymentCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
-import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
-import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryLocalListener;
@@ -67,9 +61,6 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
 import org.apache.ignite.internal.processors.continuous.GridContinuousQueryBatch;
 import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter;
-import org.apache.ignite.internal.util.GridConcurrentSkipListSet;
-import org.apache.ignite.internal.util.GridLongList;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
@@ -81,7 +72,6 @@ import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentLinkedDeque8;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
@@ -94,7 +84,12 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     private static final long serialVersionUID = 0L;
 
     /** */
-    private static final int BACKUP_ACK_THRESHOLD = 100;
+    static final int BACKUP_ACK_THRESHOLD =
+        IgniteSystemProperties.getInteger("IGNITE_CONTINUOUS_QUERY_BACKUP_ACK_THRESHOLD", 100);
+
+    /** */
+    static final int LSNR_MAX_BUF_SIZE =
+        IgniteSystemProperties.getInteger("IGNITE_CONTINUOUS_QUERY_LISTENER_MAX_BUFFER_SIZE", 10_000);
 
     /** Cache name. */
     private String cacheName;
@@ -109,7 +104,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     private CacheEntryEventSerializableFilter<K, V> rmtFilter;
 
     /** Deployable object for filter. */
-    private DeployableObject rmtFilterDep;
+    private CacheContinuousQueryDeployableObject rmtFilterDep;
 
     /** Internal flag. */
     private boolean internal;
@@ -132,9 +127,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     /** Whether to skip primary check for REPLICATED cache. */
     private transient boolean skipPrimaryCheck;
 
-    /** Backup queue. */
-    private transient volatile Collection<CacheContinuousQueryEntry> backupQueue;
-
     /** */
     private boolean locCache;
 
@@ -142,13 +134,13 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     private boolean keepBinary;
 
     /** */
-    private transient ConcurrentMap<Integer, PartitionRecovery> rcvs;
+    private transient ConcurrentMap<Integer, CacheContinuousQueryPartitionRecovery> rcvs;
 
     /** */
-    private transient ConcurrentMap<Integer, EntryBuffer> entryBufs;
+    private transient ConcurrentMap<Integer, CacheContinuousQueryEventBuffer> entryBufs;
 
     /** */
-    private transient AcknowledgeBuffer ackBuf;
+    private transient CacheContinuousQueryAcknowledgeBuffer ackBuf;
 
     /** */
     private transient int cacheId;
@@ -163,6 +155,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     private transient volatile AffinityTopologyVersion initTopVer;
 
     /** */
+    private transient volatile boolean nodeLeft;
+
+    /** */
     private transient boolean ignoreClsNotFound;
 
     /** */
@@ -337,9 +332,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
         entryBufs = new ConcurrentHashMap<>();
 
-        backupQueue = new ConcurrentLinkedDeque8<>();
-
-        ackBuf = new AcknowledgeBuffer();
+        ackBuf = new CacheContinuousQueryAcknowledgeBuffer();
 
         rcvs = new ConcurrentHashMap<>();
 
@@ -409,7 +402,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                     ctx.asyncCallbackPool().execute(clsr, evt.partitionId());
                 }
                 else {
-                    final boolean notify = filter(evt, primary);
+                    final boolean notify = filter(evt);
 
                     if (log.isDebugEnabled())
                         log.debug("Filter invoked for event [evt=" + evt + ", primary=" + primary
@@ -429,6 +422,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                             }, sync);
                         }
                     }
+                    else
+                        handleBackupEntry(cctx, evt.entry());
                 }
             }
 
@@ -438,50 +433,38 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
             }
 
             @Override public void cleanupBackupQueue(Map<Integer, Long> updateCntrs) {
-                Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue;
-
-                if (backupQueue0 != null) {
-                    Iterator<CacheContinuousQueryEntry> it = backupQueue0.iterator();
-
-                    while (it.hasNext()) {
-                        CacheContinuousQueryEntry backupEntry = it.next();
+                for (Map.Entry<Integer, Long> e : updateCntrs.entrySet()) {
+                    CacheContinuousQueryEventBuffer buf = entryBufs.get(e.getKey());
 
-                        Long updateCntr = updateCntrs.get(backupEntry.partition());
-
-                        if (updateCntr != null && backupEntry.updateCounter() <= updateCntr)
-                            it.remove();
-                    }
+                    if (buf != null)
+                        buf.cleanupBackupQueue(e.getValue());
                 }
             }
 
             @Override public void flushBackupQueue(GridKernalContext ctx, AffinityTopologyVersion topVer) {
                 assert topVer != null;
 
-                Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue;
+                try {
+                    GridCacheContext<K, V> cctx = cacheContext(ctx);
 
-                if (backupQueue0 == null)
-                    return;
+                    ClusterNode node = ctx.discovery().node(nodeId);
 
-                try {
-                    ClusterNode nodeId0 = ctx.discovery().node(nodeId);
+                    for (Map.Entry<Integer, CacheContinuousQueryEventBuffer> bufE : entryBufs.entrySet()) {
+                        CacheContinuousQueryEventBuffer buf = bufE.getValue();
 
-                    if (nodeId0 != null) {
-                        GridCacheContext<K, V> cctx = cacheContext(ctx);
+                        Collection<CacheContinuousQueryEntry> backupQueue = buf.flushOnExchange();
 
-                        for (CacheContinuousQueryEntry e : backupQueue0) {
-                            if (!e.isFiltered())
-                                prepareEntry(cctx, nodeId, e);
+                        if (backupQueue != null && node != null) {
+                            for (CacheContinuousQueryEntry e : backupQueue) {
+                                e.markBackup();
 
-                            e.topologyVersion(topVer);
-                        }
+                                if (!e.isFiltered())
+                                    prepareEntry(cctx, nodeId, e);
+                            }
 
-                        ctx.continuous().addBackupNotification(nodeId, routineId, backupQueue0, topic);
+                            ctx.continuous().addBackupNotification(nodeId, routineId, backupQueue, topic);
+                        }
                     }
-                    else
-                        // Node which start CQ leave topology. Not needed to put data to backup queue.
-                        backupQueue = null;
-
-                    backupQueue0.clear();
                 }
                 catch (IgniteCheckedException e) {
                     U.error(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY),
@@ -505,14 +488,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
             }
 
             @Override public void onPartitionEvicted(int part) {
-                Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue;
-
-                if (backupQueue0 != null) {
-                    for (Iterator<CacheContinuousQueryEntry> it = backupQueue0.iterator(); it.hasNext(); ) {
-                        if (it.next().partition() == part)
-                            it.remove();
-                    }
-                }
+                entryBufs.remove(part);
             }
 
             @Override public boolean oldValueRequired() {
@@ -739,17 +715,16 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 Collections.<CacheEntryEvent<? extends K, ? extends V>>emptyList();
         }
 
-        PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition(), e.topologyVersion());
+        CacheContinuousQueryPartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition(), e.topologyVersion());
 
         return rec.collectEntries(e, cctx, cache);
     }
 
     /**
-     * @param primary Primary.
      * @param evt Query event.
      * @return {@code True} if event passed filter otherwise {@code true}.
      */
-    public boolean filter(CacheContinuousQueryEvent evt, boolean primary) {
+    public boolean filter(CacheContinuousQueryEvent evt) {
         CacheContinuousQueryEntry entry = evt.entry();
 
         boolean notify = !entry.isFiltered();
@@ -765,15 +740,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         if (!notify)
             entry.markFiltered();
 
-        if (!primary && !internal && entry.updateCounter() != -1L /* Skip init query and expire entries */) {
-            entry.markBackup();
-
-            Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue;
-
-            if (backupQueue0 != null)
-                backupQueue0.add(entry.forBackupQueue());
-        }
-
         return notify;
     }
 
@@ -811,13 +777,13 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 if (!entry.isFiltered())
                     prepareEntry(cctx, nodeId, entry);
 
-                CacheContinuousQueryEntry e = handleEntry(entry);
+                Object entryOrList = handleEntry(cctx, entry);
 
-                if (e != null) {
+                if (entryOrList != null) {
                     if (log.isDebugEnabled())
-                        log.debug("Send the following event to listener: " + e);
+                        log.debug("Send the following event to listener: " + entryOrList);
 
-                    ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true);
+                    ctx.continuous().addNotification(nodeId, routineId, entryOrList, topic, sync, true);
                 }
             }
         }
@@ -865,7 +831,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         if (internal)
             return;
 
-        for (PartitionRecovery rec : rcvs.values())
+        for (CacheContinuousQueryPartitionRecovery rec : rcvs.values())
             rec.resetTopologyCache();
     }
 
@@ -875,12 +841,12 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
      * @param topVer Topology version for current operation.
      * @return Partition recovery.
      */
-    @NotNull private PartitionRecovery getOrCreatePartitionRecovery(GridKernalContext ctx,
+    @NotNull private CacheContinuousQueryPartitionRecovery getOrCreatePartitionRecovery(GridKernalContext ctx,
         int partId,
         AffinityTopologyVersion topVer) {
         assert topVer != null && topVer.topologyVersion() > 0 : topVer;
 
-        PartitionRecovery rec = rcvs.get(partId);
+        CacheContinuousQueryPartitionRecovery rec = rcvs.get(partId);
 
         if (rec == null) {
             T2<Long, Long> partCntrs = null;
@@ -905,10 +871,10 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
             else if (initUpdCntrs != null)
                 partCntrs = initUpdCntrs.get(partId);
 
-            rec = new PartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), topVer,
+            rec = new CacheContinuousQueryPartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), topVer,
                 partCntrs != null ? partCntrs.get2() : null);
 
-            PartitionRecovery oldRec = rcvs.putIfAbsent(partId, rec);
+            CacheContinuousQueryPartitionRecovery oldRec = rcvs.putIfAbsent(partId, rec);
 
             if (oldRec != null)
                 rec = oldRec;
@@ -918,10 +884,24 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     }
 
     /**
+     * @param cctx Cache context.
+     * @param e Entry.
+     */
+    private void handleBackupEntry(final GridCacheContext cctx, CacheContinuousQueryEntry e) {
+        if (internal || e.updateCounter() == -1L || nodeLeft) // Skip internal query and expire entries.
+            return;
+
+        CacheContinuousQueryEventBuffer buf = partitionBuffer(cctx, e.partition());
+
+        buf.processEntry(e.copyWithDataReset(), true);
+    }
+
+    /**
+     * @param cctx Cache context.
      * @param e Entry.
      * @return Entry.
      */
-    private CacheContinuousQueryEntry handleEntry(CacheContinuousQueryEntry e) {
+    private Object handleEntry(final GridCacheContext cctx, CacheContinuousQueryEntry e) {
         assert e != null;
         assert entryBufs != null;
 
@@ -934,354 +914,52 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
         // Initial query entry.
         // This events should be fired immediately.
-        if (e.updateCounter() == -1)
+        if (e.updateCounter() == -1L)
             return e;
 
-        EntryBuffer buf = entryBufs.get(e.partition());
+        CacheContinuousQueryEventBuffer buf = partitionBuffer(cctx, e.partition());
 
-        if (buf == null) {
-            buf = new EntryBuffer();
-
-            EntryBuffer oldRec = entryBufs.putIfAbsent(e.partition(), buf);
-
-            if (oldRec != null)
-                buf = oldRec;
-        }
-
-        return buf.handle(e);
+        return buf.processEntry(e, false);
     }
 
     /**
-     *
+     * @param cctx Cache context.
+     * @param part Partition.
+     * @return Event buffer.
      */
-    private static class PartitionRecovery {
-        /** Event which means hole in sequence. */
-        private static final CacheContinuousQueryEntry HOLE = new CacheContinuousQueryEntry();
-
-        /** */
-        private final static int MAX_BUFF_SIZE = 100;
-
-        /** */
-        private IgniteLogger log;
-
-        /** */
-        private long lastFiredEvt;
-
-        /** */
-        private AffinityTopologyVersion curTop = AffinityTopologyVersion.NONE;
-
-        /** */
-        private final Map<Long, CacheContinuousQueryEntry> pendingEvts = new TreeMap<>();
-
-        /**
-         * @param log Logger.
-         * @param topVer Topology version.
-         * @param initCntr Update counters.
-         */
-        PartitionRecovery(IgniteLogger log, AffinityTopologyVersion topVer, @Nullable Long initCntr) {
-            this.log = log;
-
-            if (initCntr != null) {
-                assert topVer.topologyVersion() > 0 : topVer;
-
-                this.lastFiredEvt = initCntr;
-
-                curTop = topVer;
-            }
-        }
-
-        /**
-         * Resets cached topology.
-         */
-        void resetTopologyCache() {
-            curTop = AffinityTopologyVersion.NONE;
-        }
-
-        /**
-         * Add continuous entry.
-         *
-         * @param cctx Cache context.
-         * @param cache Cache.
-         * @param entry Cache continuous query entry.
-         * @return Collection entries which will be fired. This collection should contains only non-filtered events.
-         */
-        <K, V> Collection<CacheEntryEvent<? extends K, ? extends V>> collectEntries(
-            CacheContinuousQueryEntry entry,
-            GridCacheContext cctx,
-            IgniteCache cache
-        ) {
-            assert entry != null;
-
-            if (entry.topologyVersion() == null) { // Possible if entry is sent from old node.
-                assert entry.updateCounter() == 0L : entry;
-
-                return F.<CacheEntryEvent<? extends K, ? extends V>>
-                    asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry));
-            }
-
-            List<CacheEntryEvent<? extends K, ? extends V>> entries;
-
-            synchronized (pendingEvts) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Handling event [lastFiredEvt=" + lastFiredEvt +
-                        ", curTop=" + curTop +
-                        ", entUpdCnt=" + entry.updateCounter() +
-                        ", partId=" + entry.partition() +
-                        ", pendingEvts=" + pendingEvts + ']');
-                }
-
-                // Received first event.
-                if (curTop == AffinityTopologyVersion.NONE) {
-                    lastFiredEvt = entry.updateCounter();
-
-                    curTop = entry.topologyVersion();
-
-                    if (log.isDebugEnabled()) {
-                        log.debug("First event [lastFiredEvt=" + lastFiredEvt +
-                            ", curTop=" + curTop +
-                            ", entUpdCnt=" + entry.updateCounter() +
-                            ", partId=" + entry.partition() + ']');
-                    }
-
-                    return !entry.isFiltered() ?
-                        F.<CacheEntryEvent<? extends K, ? extends V>>
-                            asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry)) :
-                        Collections.<CacheEntryEvent<? extends K, ? extends V>>emptyList();
-                }
-
-                if (curTop.compareTo(entry.topologyVersion()) < 0) {
-                    if (entry.updateCounter() == 1L && !entry.isBackup()) {
-                        entries = new ArrayList<>(pendingEvts.size());
-
-                        for (CacheContinuousQueryEntry evt : pendingEvts.values()) {
-                            if (evt != HOLE && !evt.isFiltered())
-                                entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, evt));
-                        }
-
-                        pendingEvts.clear();
-
-                        curTop = entry.topologyVersion();
-
-                        lastFiredEvt = entry.updateCounter();
-
-                        if (!entry.isFiltered())
-                            entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry));
+    private CacheContinuousQueryEventBuffer partitionBuffer(final GridCacheContext cctx, int part) {
+        CacheContinuousQueryEventBuffer buf = entryBufs.get(part);
 
-                        if (log.isDebugEnabled())
-                            log.debug("Partition was lost [lastFiredEvt=" + lastFiredEvt +
-                                ", curTop=" + curTop +
-                                ", entUpdCnt=" + entry.updateCounter() +
-                                ", partId=" + entry.partition() +
-                                ", pendingEvts=" + pendingEvts + ']');
-
-                        return entries;
-                    }
-
-                    curTop = entry.topologyVersion();
-                }
-
-                // Check duplicate.
-                if (entry.updateCounter() > lastFiredEvt) {
-                    pendingEvts.put(entry.updateCounter(), entry);
-
-                    // Put filtered events.
-                    if (entry.filteredEvents() != null) {
-                        for (long cnrt : entry.filteredEvents()) {
-                            if (cnrt > lastFiredEvt)
-                                pendingEvts.put(cnrt, HOLE);
-                        }
-                    }
-                }
-                else {
-                    if (log.isDebugEnabled())
-                        log.debug("Skip duplicate continuous query message: " + entry);
-
-                    return Collections.emptyList();
-                }
-
-                if (pendingEvts.isEmpty()) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Nothing sent to listener [lastFiredEvt=" + lastFiredEvt +
-                            ", curTop=" + curTop +
-                            ", entUpdCnt=" + entry.updateCounter() +
-                            ", partId=" + entry.partition() + ']');
-                    }
-
-                    return Collections.emptyList();
-                }
-
-                Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = pendingEvts.entrySet().iterator();
-
-                entries = new ArrayList<>();
-
-                if (pendingEvts.size() >= MAX_BUFF_SIZE) {
-                    for (int i = 0; i < MAX_BUFF_SIZE - (MAX_BUFF_SIZE / 10); i++) {
-                        Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next();
-
-                        if (e.getValue() != HOLE && !e.getValue().isFiltered())
-                            entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, e.getValue()));
-
-                        lastFiredEvt = e.getKey();
-
-                        iter.remove();
-                    }
-
-                    if (log.isDebugEnabled()) {
-                        log.debug("Pending events reached max of buffer size [lastFiredEvt=" + lastFiredEvt +
-                            ", curTop=" + curTop +
-                            ", entUpdCnt=" + entry.updateCounter() +
-                            ", partId=" + entry.partition() +
-                            ", pendingEvts=" + pendingEvts + ']');
-                    }
-                }
-                else {
-                    // Elements are consistently.
-                    while (iter.hasNext()) {
-                        Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next();
-
-                        if (e.getKey() == lastFiredEvt + 1) {
-                            ++lastFiredEvt;
+        if (buf == null) {
+            buf = new CacheContinuousQueryEventBuffer(part) {
+                @Override protected long currentPartitionCounter() {
+                    GridDhtLocalPartition locPart = cctx.topology().localPartition(part, null, false);
 
-                            if (e.getValue() != HOLE && !e.getValue().isFiltered())
-                                entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, e.getValue()));
+                    if (locPart == null)
+                        return -1L;
 
-                            iter.remove();
-                        }
-                        else
-                            break;
-                    }
+                    return locPart.updateCounter();
                 }
-            }
-
-            if (log.isDebugEnabled()) {
-                log.debug("Will send to listener the following events [entries=" + entries +
-                    ", lastFiredEvt=" + lastFiredEvt +
-                    ", curTop=" + curTop +
-                    ", entUpdCnt=" + entry.updateCounter() +
-                    ", partId=" + entry.partition() +
-                    ", pendingEvts=" + pendingEvts + ']');
-            }
-
-            return entries;
-        }
-    }
-
-    /**
-     *
-     */
-    private static class EntryBuffer {
-        /** */
-        private final static int MAX_BUFF_SIZE = 100;
+            };
 
-        /** */
-        private final GridConcurrentSkipListSet<Long> buf = new GridConcurrentSkipListSet<>();
-
-        /** */
-        private AtomicLong lastFiredCntr = new AtomicLong();
-
-        /**
-         * @param newVal New value.
-         * @return Old value if previous value less than new value otherwise {@code -1}.
-         */
-        private long updateFiredCounter(long newVal) {
-            long prevVal = lastFiredCntr.get();
-
-            while (prevVal < newVal) {
-                if (lastFiredCntr.compareAndSet(prevVal, newVal))
-                    return prevVal;
-                else
-                    prevVal = lastFiredCntr.get();
-            }
+            CacheContinuousQueryEventBuffer oldBuf = entryBufs.putIfAbsent(part, buf);
 
-            return prevVal >= newVal ? -1 : prevVal;
+            if (oldBuf != null)
+                buf = oldBuf;
         }
 
-        /**
-         * Add continuous entry.
-         *
-         * @param e Cache continuous query entry.
-         * @return Collection entries which will be fired.
-         */
-        public CacheContinuousQueryEntry handle(CacheContinuousQueryEntry e) {
-            assert e != null;
-
-            if (e.isFiltered()) {
-                Long last = buf.lastx();
-                Long first = buf.firstx();
-
-                if (last != null && first != null && last - first >= MAX_BUFF_SIZE) {
-                    NavigableSet<Long> prevHoles = buf.subSet(first, true, last, true);
-
-                    GridLongList filteredEvts = new GridLongList((int)(last - first));
-
-                    int size = 0;
-
-                    Long cntr;
-
-                    while ((cntr = prevHoles.pollFirst()) != null) {
-                        filteredEvts.add(cntr);
-
-                        ++size;
-                    }
-
-                    filteredEvts.truncate(size, true);
-
-                    e.filteredEvents(filteredEvts);
-
-                    return e;
-                }
-
-                if (lastFiredCntr.get() > e.updateCounter() || e.updateCounter() == 1)
-                    return e;
-                else {
-                    buf.add(e.updateCounter());
-
-                    // Double check. If another thread sent a event with counter higher than this event.
-                    if (lastFiredCntr.get() > e.updateCounter() && buf.contains(e.updateCounter())) {
-                        buf.remove(e.updateCounter());
-
-                        return e;
-                    }
-                    else
-                        return null;
-                }
-            }
-            else {
-                long prevVal = updateFiredCounter(e.updateCounter());
-
-                if (prevVal == -1)
-                    return e;
-                else {
-                    NavigableSet<Long> prevHoles = buf.subSet(prevVal, true, e.updateCounter(), true);
-
-                    GridLongList filteredEvts = new GridLongList((int)(e.updateCounter() - prevVal));
-
-                    int size = 0;
-
-                    Long cntr;
-
-                    while ((cntr = prevHoles.pollFirst()) != null) {
-                        filteredEvts.add(cntr);
-
-                        ++size;
-                    }
-
-                    filteredEvts.truncate(size, true);
-
-                    e.filteredEvents(filteredEvts);
-
-                    return e;
-                }
-            }
-        }
+        return buf;
     }
 
     /** {@inheritDoc} */
     @Override public void onNodeLeft() {
-        Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue;
+        nodeLeft = true;
 
-        if (backupQueue0 != null)
-            backupQueue = null;
+        for (Map.Entry<Integer, CacheContinuousQueryEventBuffer> bufE : entryBufs.entrySet()) {
+            CacheContinuousQueryEventBuffer buf = bufE.getValue();
+
+            buf.flushOnExchange();
+        }
     }
 
     /** {@inheritDoc} */
@@ -1290,7 +968,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         assert ctx.config().isPeerClassLoadingEnabled();
 
         if (rmtFilter != null && !U.isGrid(rmtFilter.getClass()))
-            rmtFilterDep = new DeployableObject(rmtFilter, ctx);
+            rmtFilterDep = new CacheContinuousQueryDeployableObject(rmtFilter, ctx);
     }
 
     /** {@inheritDoc} */
@@ -1411,7 +1089,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         boolean b = in.readBoolean();
 
         if (b)
-            rmtFilterDep = (DeployableObject)in.readObject();
+            rmtFilterDep = (CacheContinuousQueryDeployableObject)in.readObject();
         else
             rmtFilter = (CacheEntryEventSerializableFilter<K, V>)in.readObject();
 
@@ -1436,95 +1114,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         return ctx.cache().<K, V>context().cacheContext(cacheId);
     }
 
-    /** */
-    private static class AcknowledgeBuffer {
-        /** */
-        private int size;
-
-        /** */
-        @GridToStringInclude
-        private Map<Integer, Long> updateCntrs = new HashMap<>();
-
-        /** */
-        @GridToStringInclude
-        private Set<AffinityTopologyVersion> topVers = U.newHashSet(1);
-
-        /**
-         * @param batch Batch.
-         * @return Non-null tuple if acknowledge should be sent to backups.
-         */
-        @SuppressWarnings("unchecked")
-        @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>
-        onAcknowledged(GridContinuousBatch batch) {
-            assert batch instanceof GridContinuousQueryBatch;
-
-            size += ((GridContinuousQueryBatch)batch).entriesCount();
-
-            Collection<CacheContinuousQueryEntry> entries = (Collection)batch.collect();
-
-            for (CacheContinuousQueryEntry e : entries)
-                addEntry(e);
-
-            return size >= BACKUP_ACK_THRESHOLD ? acknowledgeData() : null;
-        }
-
-        /**
-         * @param e Entry.
-         * @return Non-null tuple if acknowledge should be sent to backups.
-         */
-        @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>
-        onAcknowledged(CacheContinuousQueryEntry e) {
-            size++;
-
-            addEntry(e);
-
-            return size >= BACKUP_ACK_THRESHOLD ? acknowledgeData() : null;
-        }
-
-        /**
-         * @param e Entry.
-         */
-        private void addEntry(CacheContinuousQueryEntry e) {
-            topVers.add(e.topologyVersion());
-
-            Long cntr0 = updateCntrs.get(e.partition());
-
-            if (cntr0 == null || e.updateCounter() > cntr0)
-                updateCntrs.put(e.partition(), e.updateCounter());
-        }
-
-        /**
-         * @return Non-null tuple if acknowledge should be sent to backups.
-         */
-        @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>
-            acknowledgeOnTimeout() {
-            return size > 0 ? acknowledgeData() : null;
-        }
-
-        /**
-         * @return Tuple with acknowledge information.
-         */
-        private IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> acknowledgeData() {
-            assert size > 0;
-
-            Map<Integer, Long> cntrs = new HashMap<>(updateCntrs);
-
-            IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> res =
-                new IgniteBiTuple<>(cntrs, topVers);
-
-            topVers = U.newHashSet(1);
-
-            size = 0;
-
-            return res;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(AcknowledgeBuffer.class, this);
-        }
-    }
-
     /**
      *
      */
@@ -1560,44 +1149,38 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
         /** {@inheritDoc} */
         @Override public void run() {
-            final boolean notify = filter(evt, primary);
-
-            if (!primary())
-                return;
+            final boolean notify = filter(evt);
 
-            if (fut == null) {
-                onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt);
+            if (primary || skipPrimaryCheck) {
+                if (fut == null) {
+                    onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt);
 
-                return;
-            }
+                    return;
+                }
 
-            if (fut.isDone()) {
-                if (fut.error() != null)
-                    evt.entry().markFiltered();
+                if (fut.isDone()) {
+                    if (fut.error() != null)
+                        evt.entry().markFiltered();
 
-                onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt);
-            }
-            else {
-                fut.listen(new CI1<IgniteInternalFuture<?>>() {
-                    @Override public void apply(IgniteInternalFuture<?> f) {
-                        if (f.error() != null)
-                            evt.entry().markFiltered();
-
-                        ctx.asyncCallbackPool().execute(new Runnable() {
-                            @Override public void run() {
-                                onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt);
-                            }
-                        }, evt.entry().partition());
-                    }
-                });
+                    onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt);
+                }
+                else {
+                    fut.listen(new CI1<IgniteInternalFuture<?>>() {
+                        @Override public void apply(IgniteInternalFuture<?> f) {
+                            if (f.error() != null)
+                                evt.entry().markFiltered();
+
+                            ctx.asyncCallbackPool().execute(new Runnable() {
+                                @Override public void run() {
+                                    onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt);
+                                }
+                            }, evt.entry().partition());
+                        }
+                    });
+                }
             }
-        }
-
-        /**
-         * @return {@code True} if event fired on this node.
-         */
-        private boolean primary() {
-            return primary || skipPrimaryCheck;
+            else
+                handleBackupEntry(cacheContext(ctx), evt.entry());
         }
 
         /** {@inheritDoc} */
@@ -1606,82 +1189,4 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         }
     }
 
-    /**
-     * Deployable object.
-     */
-    protected static class DeployableObject implements Externalizable {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Serialized object. */
-        private byte[] bytes;
-
-        /** Deployment class name. */
-        private String clsName;
-
-        /** Deployment info. */
-        private GridDeploymentInfo depInfo;
-
-        /**
-         * Required by {@link Externalizable}.
-         */
-        public DeployableObject() {
-            // No-op.
-        }
-
-        /**
-         * @param obj Object.
-         * @param ctx Kernal context.
-         * @throws IgniteCheckedException In case of error.
-         */
-        protected DeployableObject(Object obj, GridKernalContext ctx) throws IgniteCheckedException {
-            assert obj != null;
-            assert ctx != null;
-
-            Class cls = U.detectClass(obj);
-
-            clsName = cls.getName();
-
-            GridDeployment dep = ctx.deploy().deploy(cls, U.detectClassLoader(cls));
-
-            if (dep == null)
-                throw new IgniteDeploymentCheckedException("Failed to deploy object: " + obj);
-
-            depInfo = new GridDeploymentInfoBean(dep);
-
-            bytes = U.marshal(ctx, obj);
-        }
-
-        /**
-         * @param nodeId Node ID.
-         * @param ctx Kernal context.
-         * @return Deserialized object.
-         * @throws IgniteCheckedException In case of error.
-         */
-        <T> T unmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException {
-            assert ctx != null;
-
-            GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName,
-                depInfo.userVersion(), nodeId, depInfo.classLoaderId(), depInfo.participants(), null);
-
-            if (dep == null)
-                throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName);
-
-            return U.unmarshal(ctx, bytes, U.resolveClassLoader(dep.classLoader(), ctx.config()));
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            U.writeByteArray(out, bytes);
-            U.writeString(out, clsName);
-            out.writeObject(depInfo);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            bytes = U.readByteArray(in);
-            clsName = U.readString(in);
-            depInfo = (GridDeploymentInfo)in.readObject();
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
index 7aef4dd..e48d22e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
@@ -44,7 +44,7 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
     private Factory<? extends CacheEntryEventFilter> rmtFilterFactory;
 
     /** Deployable object for filter factory. */
-    private DeployableObject rmtFilterFactoryDep;
+    private CacheContinuousQueryDeployableObject rmtFilterFactoryDep;
 
     /** Event types for JCache API. */
     private byte types;
@@ -122,7 +122,7 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
         super.p2pMarshal(ctx);
 
         if (rmtFilterFactory != null && !U.isGrid(rmtFilterFactory.getClass()))
-            rmtFilterFactoryDep = new DeployableObject(rmtFilterFactory, ctx);
+            rmtFilterFactoryDep = new CacheContinuousQueryDeployableObject(rmtFilterFactory, ctx);
     }
 
     /** {@inheritDoc} */
@@ -167,7 +167,7 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
         boolean b = in.readBoolean();
 
         if (b)
-            rmtFilterFactoryDep = (DeployableObject)in.readObject();
+            rmtFilterFactoryDep = (CacheContinuousQueryDeployableObject)in.readObject();
         else
             rmtFilterFactory = (Factory)in.readObject();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index acf351f..1a655e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -193,7 +193,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                 lsnr.keepBinary(),
                 partId,
                 updCntr,
-                topVer);
+                topVer,
+                (byte)0);
 
             CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
                 cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
@@ -339,7 +340,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                 lsnr.keepBinary(),
                 partId,
                 updateCntr,
-                topVer);
+                topVer,
+                (byte)0);
 
             IgniteCacheProxy jcache = cctx.kernalContext().cache().jcacheProxy(cctx.name());
 
@@ -400,7 +402,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                     lsnr.keepBinary(),
                     e.partition(),
                     -1,
-                    null);
+                    null,
+                    (byte)0);
 
                 CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent(
                     cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
@@ -568,9 +571,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     }
 
     /**
-     * @param topVer Topology version.
+     * @param topVer Finished exchange topology version.
      */
-    public void beforeExchange(AffinityTopologyVersion topVer) {
+    public void flushBackupQueue(AffinityTopologyVersion topVer) {
         for (CacheContinuousQueryListener lsnr : lsnrs.values())
             lsnr.flushBackupQueue(cctx.kernalContext(), topVer);
     }
@@ -703,7 +706,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                                     keepBinary,
                                     0,
                                     -1,
-                                    null);
+                                    null,
+                                    (byte)0);
 
                                 next = new CacheContinuousQueryEvent<>(
                                     cctx.kernalContext().cache().jcache(cctx.name()),

http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
new file mode 100644
index 0000000..e210c24
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import javax.cache.event.CacheEntryEvent;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.LT;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+class CacheContinuousQueryPartitionRecovery {
+    /** Event which means hole in sequence. */
+    private static final CacheContinuousQueryEntry HOLE;
+
+    static  {
+        HOLE = new CacheContinuousQueryEntry();
+
+        HOLE.markFiltered();
+    }
+
+    /** */
+    private final static int MAX_BUFF_SIZE = CacheContinuousQueryHandler.LSNR_MAX_BUF_SIZE;
+
+    /** */
+    private IgniteLogger log;
+
+    /** */
+    private long lastFiredEvt;
+
+    /** */
+    private AffinityTopologyVersion curTop = AffinityTopologyVersion.NONE;
+
+    /** */
+    private final TreeMap<Long, CacheContinuousQueryEntry> pendingEvts = new TreeMap<>();
+
+    /**
+     * @param log Logger.
+     * @param topVer Topology version.
+     * @param initCntr Update counters.
+     */
+    CacheContinuousQueryPartitionRecovery(IgniteLogger log, AffinityTopologyVersion topVer, @Nullable Long initCntr) {
+        this.log = log;
+
+        if (initCntr != null) {
+            assert topVer.topologyVersion() > 0 : topVer;
+
+            this.lastFiredEvt = initCntr;
+
+            curTop = topVer;
+        }
+    }
+
+    /**
+     * Resets cached topology.
+     */
+    void resetTopologyCache() {
+        curTop = AffinityTopologyVersion.NONE;
+    }
+
+    /**
+     * Add continuous entry.
+     *
+     * @param cctx Cache context.
+     * @param cache Cache.
+     * @param entry Cache continuous query entry.
+     * @return Collection entries which will be fired. This collection should contains only non-filtered events.
+     */
+    <K, V> Collection<CacheEntryEvent<? extends K, ? extends V>> collectEntries(
+        CacheContinuousQueryEntry entry,
+        GridCacheContext cctx,
+        IgniteCache cache
+    ) {
+        assert entry != null;
+
+        if (entry.topologyVersion() == null) { // Possible if entry is sent from old node.
+            assert entry.updateCounter() == 0L : entry;
+
+            return F.<CacheEntryEvent<? extends K, ? extends V>>
+                asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry));
+        }
+
+        List<CacheEntryEvent<? extends K, ? extends V>> entries;
+
+        synchronized (pendingEvts) {
+            if (log.isDebugEnabled()) {
+                log.debug("Handling event [lastFiredEvt=" + lastFiredEvt +
+                    ", curTop=" + curTop +
+                    ", entUpdCnt=" + entry.updateCounter() +
+                    ", partId=" + entry.partition() +
+                    ", pendingEvts=" + pendingEvts + ']');
+            }
+
+            // Received first event.
+            if (curTop == AffinityTopologyVersion.NONE) {
+                lastFiredEvt = entry.updateCounter();
+
+                curTop = entry.topologyVersion();
+
+                if (log.isDebugEnabled()) {
+                    log.debug("First event [lastFiredEvt=" + lastFiredEvt +
+                        ", curTop=" + curTop +
+                        ", entUpdCnt=" + entry.updateCounter() +
+                        ", partId=" + entry.partition() + ']');
+                }
+
+                return !entry.isFiltered() ?
+                    F.<CacheEntryEvent<? extends K, ? extends V>>
+                        asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry)) :
+                    Collections.<CacheEntryEvent<? extends K, ? extends V>>emptyList();
+            }
+
+            if (curTop.compareTo(entry.topologyVersion()) < 0) {
+                if (entry.updateCounter() == 1L && !entry.isBackup()) {
+                    entries = new ArrayList<>(pendingEvts.size());
+
+                    for (CacheContinuousQueryEntry evt : pendingEvts.values()) {
+                        if (evt != HOLE && !evt.isFiltered())
+                            entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, evt));
+                    }
+
+                    pendingEvts.clear();
+
+                    curTop = entry.topologyVersion();
+
+                    lastFiredEvt = entry.updateCounter();
+
+                    if (!entry.isFiltered())
+                        entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry));
+
+                    if (log.isDebugEnabled())
+                        log.debug("Partition was lost [lastFiredEvt=" + lastFiredEvt +
+                            ", curTop=" + curTop +
+                            ", entUpdCnt=" + entry.updateCounter() +
+                            ", partId=" + entry.partition() +
+                            ", pendingEvts=" + pendingEvts + ']');
+
+                    return entries;
+                }
+
+                curTop = entry.topologyVersion();
+            }
+
+            // Check duplicate.
+            if (entry.updateCounter() > lastFiredEvt)
+                pendingEvts.put(entry.updateCounter(), entry);
+            else {
+                if (log.isDebugEnabled())
+                    log.debug("Skip duplicate continuous query message: " + entry);
+
+                return Collections.emptyList();
+            }
+
+            if (pendingEvts.isEmpty()) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Nothing sent to listener [lastFiredEvt=" + lastFiredEvt +
+                        ", curTop=" + curTop +
+                        ", entUpdCnt=" + entry.updateCounter() +
+                        ", partId=" + entry.partition() + ']');
+                }
+
+                return Collections.emptyList();
+            }
+
+            Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = pendingEvts.entrySet().iterator();
+
+            entries = new ArrayList<>();
+
+            if (pendingEvts.size() >= MAX_BUFF_SIZE) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Pending events reached max of buffer size [lastFiredEvt=" + lastFiredEvt +
+                        ", curTop=" + curTop +
+                        ", entUpdCnt=" + entry.updateCounter() +
+                        ", partId=" + entry.partition() +
+                        ", pendingEvts=" + pendingEvts + ']');
+                }
+
+                LT.warn(log, "Pending events reached max of buffer size [cache=" + cctx.name() +
+                    ", bufSize=" + MAX_BUFF_SIZE +
+                    ", partId=" + entry.partition() + ']');
+
+                for (int i = 0; i < MAX_BUFF_SIZE - (MAX_BUFF_SIZE / 10); i++) {
+                    Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next();
+
+                    if (e.getValue() != HOLE && !e.getValue().isFiltered())
+                        entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, e.getValue()));
+
+                    lastFiredEvt = e.getKey();
+
+                    iter.remove();
+                }
+            }
+            else {
+                boolean skippedFiltered = false;
+
+                while (iter.hasNext()) {
+                    Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next();
+
+                    CacheContinuousQueryEntry pending = e.getValue();
+
+                    long filtered = pending.filteredCount();
+
+                    boolean fire = e.getKey() == lastFiredEvt + 1;;
+
+                    if (!fire && filtered > 0)
+                        fire = e.getKey() - filtered <= lastFiredEvt + 1;
+
+                    if (fire) {
+                        lastFiredEvt = e.getKey();
+
+                        if (e.getValue() != HOLE && !e.getValue().isFiltered())
+                            entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, pending));
+
+                        iter.remove();
+                    }
+                    else {
+                        if (pending.isFiltered())
+                            skippedFiltered = true;
+                        else
+                            break;
+                    }
+                }
+
+                if (skippedFiltered)
+                    pendingEvts.headMap(lastFiredEvt).clear();
+            }
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Will send to listener the following events [entries=" + entries +
+                ", lastFiredEvt=" + lastFiredEvt +
+                ", curTop=" + curTop +
+                ", entUpdCnt=" + entry.updateCounter() +
+                ", partId=" + entry.partition() +
+                ", pendingEvts=" + pendingEvts + ']');
+        }
+
+        return entries;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java
index 4540de1..597eae8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java
@@ -25,7 +25,7 @@ import org.jsr166.ConcurrentLinkedDeque8;
  */
 public class GridContinuousBatchAdapter implements GridContinuousBatch {
     /** Buffer. */
-    private final ConcurrentLinkedDeque8<Object> buf = new ConcurrentLinkedDeque8<>();
+    protected final ConcurrentLinkedDeque8<Object> buf = new ConcurrentLinkedDeque8<>();
 
     /** {@inheritDoc} */
     @Override public void add(Object obj) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index abcd8ea..a72dcd6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -75,7 +75,6 @@ import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -872,10 +871,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 sendNotification(nodeId, routineId, null, toSnd, orderedTopic, true, null);
         }
         else {
-            LocalRoutineInfo localRoutineInfo = locInfos.get(routineId);
+            LocalRoutineInfo locRoutineInfo = locInfos.get(routineId);
 
-            if (localRoutineInfo != null)
-                localRoutineInfo.handler().notifyCallback(nodeId, routineId, objs, ctx);
+            if (locRoutineInfo != null)
+                locRoutineInfo.handler().notifyCallback(nodeId, routineId, objs, ctx);
         }
     }
 
@@ -897,7 +896,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         throws IgniteCheckedException {
         assert nodeId != null;
         assert routineId != null;
-        assert !msg || obj instanceof Message : obj;
+        assert !msg || (obj instanceof Message || obj instanceof Collection) : obj;
 
         assert !nodeId.equals(ctx.localNodeId());
 
@@ -917,7 +916,13 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 syncMsgFuts.put(futId, fut);
 
                 try {
-                    sendNotification(nodeId, routineId, futId, F.asList(obj), null, msg, null);
+                    sendNotification(nodeId,
+                        routineId,
+                        futId,
+                        obj instanceof Collection ? (Collection)obj : F.asList(obj),
+                        null,
+                        msg,
+                        null);
 
                     info.hnd.onBatchAcknowledged(routineId, info.add(obj), ctx);
                 }
@@ -1563,7 +1568,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         GridContinuousBatch addAll(Collection<?> objs) {
             assert objs != null;
 
-            GridContinuousBatch toSnd = null;
+            GridContinuousBatch toSnd;
 
             lock.writeLock().lock();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousQueryBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousQueryBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousQueryBatch.java
index c5d854b..0eba44b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousQueryBatch.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousQueryBatch.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.continuous;
 
+import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEntry;
@@ -31,11 +32,20 @@ public class GridContinuousQueryBatch extends GridContinuousBatchAdapter {
     /** {@inheritDoc} */
     @Override public void add(Object obj) {
         assert obj != null;
-        assert obj instanceof CacheContinuousQueryEntry;
+        assert obj instanceof CacheContinuousQueryEntry || obj instanceof List;
 
-        super.add(obj);
+        if (obj instanceof CacheContinuousQueryEntry) {
+            buf.add(obj);
 
-        size.addAndGet(((CacheContinuousQueryEntry)obj).size());
+            size.incrementAndGet();
+        }
+        else {
+            List<Object> objs = (List<Object>)obj;
+
+            buf.addAll(objs);
+
+            size.addAndGet(objs.size());
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicSelfTest.java
index 3cab9e0..d505d19 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicSelfTest.java
@@ -25,7 +25,6 @@ import org.apache.ignite.cache.CacheMode;
  */
 public class CacheContinuousQueryAsyncFailoverAtomicSelfTest
     extends CacheContinuousQueryFailoverAbstractSelfTest {
-
     /** {@inheritDoc} */
     @Override protected CacheMode cacheMode() {
         return CacheMode.PARTITIONED;

http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
new file mode 100644
index 0000000..9c7c836
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        ((TcpDiscoverySpi) cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentUpdatePartitionAtomic() throws Exception {
+        concurrentUpdatePartition(ATOMIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentUpdatePartitionTx() throws Exception {
+        concurrentUpdatePartition(TRANSACTIONAL);
+    }
+
+    /**
+     * @param atomicityMode Cache atomicity mode.
+     * @throws Exception If failed.
+     */
+    private void concurrentUpdatePartition(CacheAtomicityMode atomicityMode) throws Exception {
+        Ignite srv = startGrid(0);
+
+        client = true;
+
+        Ignite client = startGrid(1);
+
+        CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
+
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicityMode(atomicityMode);
+
+        IgniteCache clientCache = client.createCache(ccfg);
+
+        final AtomicInteger evtCnt = new AtomicInteger();
+
+        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+        qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+                for (CacheEntryEvent evt : evts) {
+                    assertNotNull(evt.getKey());
+                    assertNotNull(evt.getValue());
+
+                    evtCnt.incrementAndGet();
+                }
+            }
+        });
+
+        clientCache.query(qry);
+
+        Affinity<Integer> aff = srv.affinity(DEFAULT_CACHE_NAME);
+
+        final List<Integer> keys = new ArrayList<>();
+
+        final int KEYS = 10;
+
+        for (int i = 0; i < 100_000; i++) {
+            if (aff.partition(i) == 0) {
+                keys.add(i);
+
+                if (keys.size() == KEYS)
+                    break;
+            }
+        }
+
+        assertEquals(KEYS, keys.size());
+
+        final int THREADS = 10;
+        final int UPDATES = 1000;
+
+        final IgniteCache<Object, Object> srvCache = srv.cache(DEFAULT_CACHE_NAME);
+
+        for (int i = 0; i < 15; i++) {
+            log.info("Iteration: " + i);
+
+            GridTestUtils.runMultiThreaded(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    for (int i = 0; i < UPDATES; i++)
+                        srvCache.put(keys.get(rnd.nextInt(KEYS)), i);
+
+                    return null;
+                }
+            }, THREADS, "update");
+
+            GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    log.info("Events: " + evtCnt.get());
+
+                    return evtCnt.get() >= THREADS * UPDATES;
+                }
+            }, 5000);
+
+            assertEquals(THREADS * UPDATES, evtCnt.get());
+
+            evtCnt.set(0);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentUpdatesAndQueryStartAtomic() throws Exception {
+        concurrentUpdatesAndQueryStart(ATOMIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentUpdatesAndQueryStartTx() throws Exception {
+        concurrentUpdatesAndQueryStart(TRANSACTIONAL);
+    }
+
+    /**
+     * @param atomicityMode Cache atomicity mode.
+     * @throws Exception If failed.
+     */
+    private void concurrentUpdatesAndQueryStart(CacheAtomicityMode atomicityMode) throws Exception {
+        Ignite srv = startGrid(0);
+
+        client = true;
+
+        Ignite client = startGrid(1);
+
+        CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
+
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicityMode(atomicityMode);
+
+        IgniteCache clientCache = client.createCache(ccfg);
+
+        Affinity<Integer> aff = srv.affinity(DEFAULT_CACHE_NAME);
+
+        final List<Integer> keys = new ArrayList<>();
+
+        final int KEYS = 10;
+
+        for (int i = 0; i < 100_000; i++) {
+            if (aff.partition(i) == 0) {
+                keys.add(i);
+
+                if (keys.size() == KEYS)
+                    break;
+            }
+        }
+
+        assertEquals(KEYS, keys.size());
+
+        final int THREADS = 10;
+        final int UPDATES = 1000;
+
+        for (int i = 0; i < 5; i++) {
+            log.info("Iteration: " + i);
+
+            ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+            final AtomicInteger evtCnt = new AtomicInteger();
+
+            qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
+                @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+                    for (CacheEntryEvent evt : evts) {
+                        assertNotNull(evt.getKey());
+                        assertNotNull(evt.getValue());
+
+                        if ((Integer)evt.getValue() >= 0)
+                            evtCnt.incrementAndGet();
+                    }
+                }
+            });
+
+            QueryCursor cur;
+
+            final IgniteCache<Object, Object> srvCache = srv.cache(DEFAULT_CACHE_NAME);
+
+            final AtomicBoolean stop = new AtomicBoolean();
+
+            try {
+                IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                        while (!stop.get())
+                            srvCache.put(keys.get(rnd.nextInt(KEYS)), rnd.nextInt(100) - 200);
+
+                        return null;
+                    }
+                }, THREADS, "update");
+
+                U.sleep(1000);
+
+                cur = clientCache.query(qry);
+
+                U.sleep(1000);
+
+                stop.set(true);
+
+                fut.get();
+            }
+            finally {
+                stop.set(true);
+            }
+
+            GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    for (int i = 0; i < UPDATES; i++)
+                        srvCache.put(keys.get(rnd.nextInt(KEYS)), i);
+
+                    return null;
+                }
+            }, THREADS, "update");
+
+
+            GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    log.info("Events: " + evtCnt.get());
+
+                    return evtCnt.get() >= THREADS * UPDATES;
+                }
+            }, 5000);
+
+            assertEquals(THREADS * UPDATES, evtCnt.get());
+
+            cur.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java
new file mode 100644
index 0000000..382f166
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CyclicBarrier;
+import javax.cache.event.EventType;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+@SuppressWarnings("unchecked")
+public class CacheContinuousQueryEventBufferTest extends GridCommonAbstractTest {
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBuffer1() throws Exception {
+        testBuffer(1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBuffer2() throws Exception {
+        for (int i = 0; i < 10; i++) {
+            log.info("Iteration: " + i);
+
+            testBuffer(10);
+        }
+    }
+
+    /**
+     * @param threads Threads number.
+     * @throws Exception If failed.
+     */
+    private void testBuffer(int threads) throws Exception {
+        long seed = System.nanoTime();
+
+        Random rnd = new Random(seed);
+
+        log.info("Start test, seed: " + seed);
+
+        for (int i = 0; i < 10; i++) {
+            int cnt = rnd.nextInt(10_000) + 1;
+
+            testBuffer(rnd, new CacheContinuousQueryEventBuffer(0), cnt, 1, 0.5f, threads);
+            testBuffer(rnd, new CacheContinuousQueryEventBuffer(0), cnt, 1, 0.9f, threads);
+            testBuffer(rnd, new CacheContinuousQueryEventBuffer(0), cnt, 1, 0.99f, threads);
+            testBuffer(rnd, new CacheContinuousQueryEventBuffer(0), cnt, 1, 0.01f, threads);
+            testBuffer(rnd, new CacheContinuousQueryEventBuffer(0), cnt, 1, 0.f, threads);
+        }
+
+        CacheContinuousQueryEventBuffer b = new CacheContinuousQueryEventBuffer(0);
+
+        long cntr = 1;
+
+        for (int i = 0; i < 10; i++) {
+            int cnt = rnd.nextInt(10_000) + 1;
+            float ratio = rnd.nextFloat();
+
+            testBuffer(rnd, b, cnt, cntr, ratio, threads);
+
+            cntr += cnt;
+        }
+    }
+
+    /**
+     * @param rnd Random.
+     * @param b Buffer.
+     * @param cnt Entries count.
+     * @param cntr Current counter.
+     * @param filterRatio Filtered events ratio.
+     * @param threads Threads number.
+     * @throws Exception If failed.
+     */
+    private void testBuffer(Random rnd,
+        final CacheContinuousQueryEventBuffer b,
+        int cnt,
+        long cntr,
+        float filterRatio,
+        int threads)
+        throws Exception
+    {
+        List<CacheContinuousQueryEntry> expEntries = new ArrayList<>();
+
+        List<CacheContinuousQueryEntry> entries = new ArrayList<>();
+
+        long filtered = b.currentFiltered();
+
+        for (int i = 0; i < cnt; i++) {
+            CacheContinuousQueryEntry entry = new CacheContinuousQueryEntry(
+                0,
+                EventType.CREATED,
+                null,
+                null,
+                null,
+                false,
+                0,
+                cntr,
+                null,
+                (byte)0);
+
+            entries.add(entry);
+
+            if (rnd.nextFloat() < filterRatio) {
+                entry.markFiltered();
+
+                filtered++;
+            }
+            else {
+                CacheContinuousQueryEntry expEntry = new CacheContinuousQueryEntry(
+                    0,
+                    EventType.CREATED,
+                    null,
+                    null,
+                    null,
+                    false,
+                    0,
+                    cntr,
+                    null,
+                    (byte)0);
+
+                expEntry.filteredCount(filtered);
+
+                expEntries.add(expEntry);
+
+                filtered = 0;
+            }
+
+            cntr++;
+        }
+
+        Collections.shuffle(entries, rnd);
+
+        List<CacheContinuousQueryEntry> actualEntries = new ArrayList<>(expEntries.size());
+
+        if (threads == 1) {
+            for (int i = 0; i < entries.size(); i++) {
+                Object o = entries.get(i);
+
+                Object res = b.processEntry((CacheContinuousQueryEntry)o, false);
+
+                if (res != null) {
+                    if (res instanceof CacheContinuousQueryEntry)
+                        actualEntries.add((CacheContinuousQueryEntry)res);
+                    else
+                        actualEntries.addAll((List<CacheContinuousQueryEntry>)res);
+                }
+            }
+        }
+        else {
+            final CyclicBarrier barrier = new CyclicBarrier(threads);
+
+            final ConcurrentLinkedQueue<CacheContinuousQueryEntry> q = new ConcurrentLinkedQueue<>(entries);
+
+            final ConcurrentSkipListMap<Long, CacheContinuousQueryEntry> act0 = new ConcurrentSkipListMap<>();
+
+            GridTestUtils.runMultiThreaded(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    barrier.await();
+
+                    Object o;
+
+                    while ((o = q.poll()) != null) {
+                        Object res = b.processEntry((CacheContinuousQueryEntry)o, false);
+
+                        if (res != null) {
+                            if (res instanceof CacheContinuousQueryEntry)
+                                act0.put(((CacheContinuousQueryEntry)res).updateCounter(), (CacheContinuousQueryEntry)res);
+                            else {
+                                for (CacheContinuousQueryEntry e : ((List<CacheContinuousQueryEntry>)res))
+                                    act0.put(e.updateCounter(), e);
+                            }
+                        }
+                    }
+
+                    return null;
+                }
+            }, threads, "test");
+
+            actualEntries.addAll(act0.values());
+        }
+
+        assertEquals(expEntries.size(), actualEntries.size());
+
+        for (int i = 0; i < expEntries.size(); i++) {
+            CacheContinuousQueryEntry expEvt = expEntries.get(i);
+            CacheContinuousQueryEntry actualEvt = actualEntries.get(i);
+
+            assertEquals(expEvt.updateCounter(), actualEvt.updateCounter());
+            assertEquals(expEvt.filteredCount(), actualEvt.filteredCount());
+        }
+    }
+}


Mime
View raw message