ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [17/50] [abbrv] ignite git commit: IGNITE-426 Implemented failover for Continuous query.
Date Sat, 21 Nov 2015 02:01:55 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 1219f2f..72a60d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -78,6 +78,11 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
     @GridDirectCollection(CacheObject.class)
     private List<CacheObject> vals;
 
+    /** Previous values. */
+    @GridToStringInclude
+    @GridDirectCollection(CacheObject.class)
+    private List<CacheObject> prevVals;
+
     /** Conflict versions. */
     @GridDirectCollection(GridCacheVersion.class)
     private List<GridCacheVersion> conflictVers;
@@ -139,10 +144,19 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
     /** Task name hash. */
     private int taskNameHash;
 
+    /** Partition. */
+    private GridLongList updateCntrs;
+
     /** On response flag. Access should be synced on future. */
     @GridDirectTransient
     private boolean onRes;
 
+    @GridDirectTransient
+    private List<Integer> partIds;
+
+    @GridDirectTransient
+    private List<CacheObject> localPrevVals;
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -193,6 +207,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
         this.addDepInfo = addDepInfo;
 
         keys = new ArrayList<>();
+        partIds = new ArrayList<>();
+        localPrevVals = new ArrayList<>();
 
         if (forceTransformBackups) {
             entryProcessors = new ArrayList<>();
@@ -216,15 +232,25 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
      * @param ttl TTL (optional).
      * @param conflictExpireTime Conflict expire time (optional).
      * @param conflictVer Conflict version (optional).
+     * @param addPrevVal If {@code true} adds previous value.
+     * @param prevVal Previous value.
      */
     public void addWriteValue(KeyCacheObject key,
         @Nullable CacheObject val,
         EntryProcessor<Object, Object, Object> entryProcessor,
         long ttl,
         long conflictExpireTime,
-        @Nullable GridCacheVersion conflictVer) {
+        @Nullable GridCacheVersion conflictVer,
+        boolean addPrevVal,
+        int partId,
+        @Nullable CacheObject prevVal,
+        @Nullable Long updateIdx) {
         keys.add(key);
 
+        partIds.add(partId);
+
+        localPrevVals.add(prevVal);
+
         if (forceTransformBackups) {
             assert entryProcessor != null;
 
@@ -233,6 +259,20 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
         else
             vals.add(val);
 
+        if (addPrevVal) {
+            if (prevVals == null)
+                prevVals = new ArrayList<>();
+
+            prevVals.add(prevVal);
+        }
+
+        if (updateIdx != null) {
+            if (updateCntrs == null)
+                updateCntrs = new GridLongList();
+
+            updateCntrs.add(updateIdx);
+        }
+
         // In case there is no conflict, do not create the list.
         if (conflictVer != null) {
             if (conflictVers == null) {
@@ -283,8 +323,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
         @Nullable CacheObject val,
         EntryProcessor<Object, Object, Object> entryProcessor,
         long ttl,
-        long expireTime)
-    {
+        long expireTime) {
         if (nearKeys == null) {
             nearKeys = new ArrayList<>();
 
@@ -415,6 +454,25 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
     }
 
     /**
+     * @param idx Partition index.
+     * @return Partition id.
+     */
+    public int partitionId(int idx) {
+        return partIds.get(idx);
+    }
+
+    /**
+     * @param updCntr Update counter.
+     * @return Update counter.
+     */
+    public Long updateCounter(int updCntr) {
+        if (updateCntrs != null && updCntr < updateCntrs.size())
+            return updateCntrs.get(updCntr);
+
+        return null;
+    }
+
+    /**
      * @param idx Near key index.
      * @return Key.
      */
@@ -435,6 +493,25 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
     /**
      * @param idx Key index.
+     * @return Value.
+     */
+    @Nullable public CacheObject previousValue(int idx) {
+        if (prevVals != null)
+            return prevVals.get(idx);
+
+        return null;
+    }
+
+    /**
+     * @param idx Key index.
+     * @return Value.
+     */
+    @Nullable public CacheObject localPreviousValue(int idx) {
+        return localPrevVals.get(idx);
+    }
+
+    /**
+     * @param idx Key index.
      * @return Entry processor.
      */
     @Nullable public EntryProcessor<Object, Object, Object> entryProcessor(int idx) {
@@ -544,8 +621,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
         return invokeArgs;
     }
 
-    /** {@inheritDoc}
-     * @param ctx*/
+    /** {@inheritDoc} */
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
         super.prepareMarshal(ctx);
 
@@ -695,42 +771,54 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
                 writer.incrementState();
 
             case 16:
-                if (!writer.writeUuid("subjId", subjId))
+                if (!writer.writeCollection("prevVals", prevVals, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 17:
-                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
+                if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
             case 18:
-                if (!writer.writeInt("taskNameHash", taskNameHash))
+                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
                     return false;
 
                 writer.incrementState();
 
             case 19:
-                if (!writer.writeMessage("topVer", topVer))
+                if (!writer.writeInt("taskNameHash", taskNameHash))
                     return false;
 
                 writer.incrementState();
 
             case 20:
-                if (!writer.writeMessage("ttls", ttls))
+                if (!writer.writeMessage("topVer", topVer))
                     return false;
 
                 writer.incrementState();
 
             case 21:
-                if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
+                if (!writer.writeMessage("ttls", ttls))
                     return false;
 
                 writer.incrementState();
 
             case 22:
+                if (!writer.writeMessage("updateCntrs", updateCntrs))
+                    return false;
+
+                writer.incrementState();
+
+            case 23:
+                if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 24:
                 if (!writer.writeMessage("writeVer", writeVer))
                     return false;
 
@@ -857,7 +945,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
                 reader.incrementState();
 
             case 16:
-                subjId = reader.readUuid("subjId");
+                prevVals = reader.readCollection("prevVals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -865,6 +953,14 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
                 reader.incrementState();
 
             case 17:
+                subjId = reader.readUuid("subjId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 18:
                 byte syncModeOrd;
 
                 syncModeOrd = reader.readByte("syncMode");
@@ -876,7 +972,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
                 reader.incrementState();
 
-            case 18:
+            case 19:
                 taskNameHash = reader.readInt("taskNameHash");
 
                 if (!reader.isLastRead())
@@ -884,7 +980,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
                 reader.incrementState();
 
-            case 19:
+            case 20:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -892,7 +988,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
                 reader.incrementState();
 
-            case 20:
+            case 21:
                 ttls = reader.readMessage("ttls");
 
                 if (!reader.isLastRead())
@@ -900,7 +996,15 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
                 reader.incrementState();
 
-            case 21:
+            case 22:
+                updateCntrs = reader.readMessage("updateCntrs");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 23:
                 vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -908,7 +1012,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
                 reader.incrementState();
 
-            case 22:
+            case 24:
                 writeVer = reader.readMessage("writeVer");
 
                 if (!reader.isLastRead())
@@ -928,7 +1032,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 23;
+        return 25;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 2f2944d..43f34c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -613,7 +613,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                                 if (updateTop) {
                                     for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
                                         if (top.cacheId() == cacheCtx.cacheId()) {
-                                            cacheCtx.topology().update(exchId, top.partitionMap(true));
+                                            cacheCtx.topology().update(exchId,
+                                                top.partitionMap(true),
+                                                top.updateCounters());
 
                                             break;
                                         }
@@ -813,6 +815,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                     }
                 }
 
+                boolean topChanged = discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT;
+
                 for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
                     if (cacheCtx.isLocal())
                         continue;
@@ -823,6 +827,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                     if (drCacheCtx.isDrEnabled())
                         drCacheCtx.dr().beforeExchange(topVer, exchId.isLeft());
 
+                    if (topChanged)
+                        cacheCtx.continuousQueries().beforeExchange(exchId.topologyVersion());
+
                     // Partition release future is done so we can flush the write-behind store.
                     cacheCtx.store().forceFlush();
 
@@ -956,14 +963,18 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      * @param id ID.
      * @throws IgniteCheckedException If failed.
      */
-    private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) throws IgniteCheckedException {
+    private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id)
+        throws IgniteCheckedException {
         GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id,
             clientOnlyExchange,
             cctx.versions().last());
 
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-            if (!cacheCtx.isLocal())
+            if (!cacheCtx.isLocal()) {
                 m.addLocalPartitionMap(cacheCtx.cacheId(), cacheCtx.topology().localPartitionMap());
+
+                m.partitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters());
+            }
         }
 
         if (log.isDebugEnabled())
@@ -989,15 +1000,21 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
                 boolean ready = startTopVer == null || startTopVer.compareTo(id.topologyVersion()) <= 0;
 
-                if (ready)
+                if (ready) {
                     m.addFullPartitionsMap(cacheCtx.cacheId(), cacheCtx.topology().partitionMap(true));
+
+                    m.addPartitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters());
+                }
             }
         }
 
         // It is important that client topologies be added after contexts.
-        for (GridClientPartitionTopology top : cctx.exchange().clientTopologies())
+        for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
             m.addFullPartitionsMap(top.cacheId(), top.partitionMap(true));
 
+            m.addPartitionUpdateCounters(top.cacheId(), top.updateCounters());
+        }
+
         if (log.isDebugEnabled())
             log.debug("Sending full partition map [nodeIds=" + F.viewReadOnly(nodes, F.node2id()) +
                 ", exchId=" + exchId + ", msg=" + m + ']');
@@ -1334,15 +1351,17 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) {
             Integer cacheId = entry.getKey();
 
+            Map<Integer, Long> cntrMap = msg.partitionUpdateCounters(cacheId);
+
             GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
 
             if (cacheCtx != null)
-                cacheCtx.topology().update(exchId, entry.getValue());
+                cacheCtx.topology().update(exchId, entry.getValue(), cntrMap);
             else {
                 ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, AffinityTopologyVersion.NONE);
 
                 if (oldest != null && oldest.isLocal())
-                    cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue());
+                    cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue(), cntrMap);
             }
         }
     }
@@ -1360,7 +1379,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             GridDhtPartitionTopology top = cacheCtx != null ? cacheCtx.topology() :
                 cctx.exchange().clientTopology(cacheId, this);
 
-            top.update(exchId, entry.getValue());
+            top.update(exchId, entry.getValue(), msg.partitionUpdateCounters(cacheId));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index c06d773..3f4f9bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
 import java.io.Externalizable;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.ignite.IgniteCheckedException;
@@ -48,6 +49,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     /** */
     private byte[] partsBytes;
 
+    /** Partitions update counters. */
+    @GridToStringInclude
+    @GridDirectTransient
+    private Map<Integer, Map<Integer, Long>> partCntrs;
+
+    /** Serialized partitions counters. */
+    private byte[] partCntrsBytes;
+
     /** Topology version. */
     private AffinityTopologyVersion topVer;
 
@@ -92,13 +101,41 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
             parts.put(cacheId, fullMap);
     }
 
-    /** {@inheritDoc}
-     * @param ctx*/
+    /**
+     * @param cacheId Cache ID.
+     * @param cntrMap Partition update counters.
+     */
+    public void addPartitionUpdateCounters(int cacheId, Map<Integer, Long> cntrMap) {
+        if (partCntrs == null)
+            partCntrs = new HashMap<>();
+
+        if (!partCntrs.containsKey(cacheId))
+            partCntrs.put(cacheId, cntrMap);
+    }
+
+    /**
+     * @param cacheId Cache ID.
+     * @return Partition update counters.
+     */
+    public Map<Integer, Long> partitionUpdateCounters(int cacheId) {
+        if (partCntrs != null) {
+            Map<Integer, Long> res = partCntrs.get(cacheId);
+
+            return res != null ? res : Collections.<Integer, Long>emptyMap();
+        }
+
+        return Collections.emptyMap();
+    }
+
+    /** {@inheritDoc} */
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
         super.prepareMarshal(ctx);
 
         if (parts != null && partsBytes == null)
             partsBytes = ctx.marshaller().marshal(parts);
+
+        if (partCntrs != null)
+            partCntrsBytes = ctx.marshaller().marshal(partCntrs);
     }
 
     /**
@@ -121,6 +158,9 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
         if (partsBytes != null && parts == null)
             parts = ctx.marshaller().unmarshal(partsBytes, ldr);
+
+        if (partCntrsBytes != null)
+            partCntrs = ctx.marshaller().unmarshal(partCntrsBytes, ldr);
     }
 
     /** {@inheritDoc} */
@@ -139,12 +179,18 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
         switch (writer.state()) {
             case 5:
-                if (!writer.writeByteArray("partsBytes", partsBytes))
+                if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
                     return false;
 
                 writer.incrementState();
 
             case 6:
+                if (!writer.writeByteArray("partsBytes", partsBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -167,7 +213,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
         switch (reader.state()) {
             case 5:
-                partsBytes = reader.readByteArray("partsBytes");
+                partCntrsBytes = reader.readByteArray("partCntrsBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -175,6 +221,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 6:
+                partsBytes = reader.readByteArray("partsBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -194,7 +248,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 7;
+        return 8;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 83fbb1a..a2366bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
 import java.io.Externalizable;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.ignite.IgniteCheckedException;
@@ -46,6 +47,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
     /** Serialized partitions. */
     private byte[] partsBytes;
 
+    /** Partitions update counters. */
+    @GridToStringInclude
+    @GridDirectTransient
+    private Map<Integer, Map<Integer, Long>> partCntrs;
+
+    /** Serialized partitions counters. */
+    private byte[] partCntrsBytes;
+
     /** */
     private boolean client;
 
@@ -90,6 +99,31 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
     }
 
     /**
+     * @param cacheId Cache ID.
+     * @param cntrMap Partition update counters.
+     */
+    public void partitionUpdateCounters(int cacheId, Map<Integer, Long> cntrMap) {
+        if (partCntrs == null)
+            partCntrs = new HashMap<>();
+
+        partCntrs.put(cacheId, cntrMap);
+    }
+
+    /**
+     * @param cacheId Cache ID.
+     * @return Partition update counters.
+     */
+    public Map<Integer, Long> partitionUpdateCounters(int cacheId) {
+        if (partCntrs != null) {
+            Map<Integer, Long> res = partCntrs.get(cacheId);
+
+            return res != null ? res : Collections.<Integer, Long>emptyMap();
+        }
+
+        return Collections.emptyMap();
+    }
+
+    /**
      * @return Local partitions.
      */
     public Map<Integer, GridDhtPartitionMap> partitions() {
@@ -103,6 +137,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
         if (partsBytes == null && parts != null)
             partsBytes = ctx.marshaller().marshal(parts);
+
+        if (partCntrs != null)
+            partCntrsBytes = ctx.marshaller().marshal(partCntrs);
     }
 
     /** {@inheritDoc} */
@@ -111,6 +148,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
         if (partsBytes != null && parts == null)
             parts = ctx.marshaller().unmarshal(partsBytes, ldr);
+
+        if (partCntrsBytes != null)
+            partCntrs = ctx.marshaller().unmarshal(partCntrsBytes, ldr);
     }
 
     /** {@inheritDoc} */
@@ -135,6 +175,12 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
                 writer.incrementState();
 
             case 6:
+                if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
                 if (!writer.writeByteArray("partsBytes", partsBytes))
                     return false;
 
@@ -165,6 +211,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
                 reader.incrementState();
 
             case 6:
+                partCntrsBytes = reader.readByteArray("partCntrsBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
                 partsBytes = reader.readByteArray("partsBytes");
 
                 if (!reader.isLastRead())
@@ -184,7 +238,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 7;
+        return 8;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 1bf03a9..706655b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -249,7 +249,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
                         /*write-through*/false,
                         /*read-through*/false,
                         /*retval*/false,
-                        /**expiry policy*/null,
+                        /*expiry policy*/null,
                         /*event*/true,
                         /*metrics*/true,
                         /*primary*/false,
@@ -263,7 +263,9 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
                         false,
                         false,
                         subjId,
-                        taskName);
+                        taskName,
+                        null,
+                        null);
 
                     if (updRes.removeVersion() != null)
                         ctx.onDeferredDelete(entry, updRes.removeVersion());
@@ -361,7 +363,9 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
                             false,
                             intercept,
                             req.subjectId(),
-                            taskName);
+                            taskName,
+                            null,
+                            null);
 
                         if (updRes.removeVersion() != null)
                             ctx.onDeferredDelete(entry, updRes.removeVersion());

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
index d078df4..ba58f57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
@@ -226,6 +226,13 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter {
     }
 
     /**
+     * @param cntrs Partition indexes.
+     */
+    @Override public void setPartitionUpdateCounters(long[] cntrs) {
+        // No-op.
+    }
+
+    /**
      * Adds owned versions to map.
      *
      * @param vers Map of owned versions.

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java
new file mode 100644
index 0000000..7db9026
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.GridDirectMap;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Batch acknowledgement.
+ */
+public class CacheContinuousQueryBatchAck extends GridCacheMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Routine ID. */
+    private UUID routineId;
+
+    /** Update counters. */
+    @GridToStringInclude
+    @GridDirectMap(keyType = Integer.class, valueType = Long.class)
+    private Map<Integer, Long> updateCntrs;
+
+    /**
+     * Default constructor.
+     */
+    public CacheContinuousQueryBatchAck() {
+        // No-op.
+    }
+
+    /**
+     * @param cacheId Cache ID.
+     * @param routineId Routine ID.
+     * @param updateCntrs Update counters.
+     */
+    CacheContinuousQueryBatchAck(int cacheId, UUID routineId, Map<Integer, Long> updateCntrs) {
+        this.cacheId = cacheId;
+        this.routineId = routineId;
+        this.updateCntrs = updateCntrs;
+    }
+
+    /**
+     * @return Routine ID.
+     */
+    UUID routineId() {
+        return routineId;
+    }
+
+    /**
+     * @return Update counters.
+     */
+    Map<Integer, Long> updateCntrs() {
+        return updateCntrs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeUuid("routineId", routineId))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeMap("updateCntrs", updateCntrs, MessageCollectionItemType.INT,
+                    MessageCollectionItemType.LONG))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                routineId = reader.readUuid("routineId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                updateCntrs = reader.readMap("updateCntrs", MessageCollectionItemType.INT,
+                    MessageCollectionItemType.LONG, false);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(CacheContinuousQueryBatchAck.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean addDeploymentInfo() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 118;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 5;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CacheContinuousQueryBatchAck.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
index a4b35eb..0495e6d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -22,10 +22,12 @@ import javax.cache.event.EventType;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -42,6 +44,12 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
     private static final long serialVersionUID = 0L;
 
     /** */
+    private static final byte BACKUP_ENTRY = 0b0001;
+
+    /** */
+    private static final byte FILTERED_ENTRY = 0b0010;
+
+    /** */
     private static final EventType[] EVT_TYPE_VALS = EventType.values();
 
     /**
@@ -75,8 +83,24 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
     @GridDirectTransient
     private GridDeploymentInfo depInfo;
 
+    /** Partition. */
+    private int part;
+
+    /** Update counter. */
+    private long updateCntr;
+
+    /** Flags. */
+    private byte flags;
+
+    /** */
+    @GridToStringInclude
+    private AffinityTopologyVersion topVer;
+
+    /** Filtered events. */
+    private GridLongList filteredEvts;
+
     /**
-     * Required by {@link org.apache.ignite.plugin.extensions.communication.Message}.
+     * Required by {@link Message}.
      */
     public CacheContinuousQueryEntry() {
         // No-op.
@@ -88,18 +112,34 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
      * @param key Key.
      * @param newVal New value.
      * @param oldVal Old value.
+     * @param part Partition.
+     * @param updateCntr Update partition counter.
+     * @param topVer Topology version if applicable.
      */
     CacheContinuousQueryEntry(
         int cacheId,
         EventType evtType,
         KeyCacheObject key,
         @Nullable CacheObject newVal,
-        @Nullable CacheObject oldVal) {
+        @Nullable CacheObject oldVal,
+        int part,
+        long updateCntr,
+        @Nullable AffinityTopologyVersion topVer) {
         this.cacheId = cacheId;
         this.evtType = evtType;
         this.key = key;
         this.newVal = newVal;
         this.oldVal = oldVal;
+        this.part = part;
+        this.updateCntr = updateCntr;
+        this.topVer = topVer;
+    }
+
+    /**
+     * @return Topology version if applicable.
+     */
+    @Nullable AffinityTopologyVersion topologyVersion() {
+        return topVer;
     }
 
     /**
@@ -117,6 +157,66 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
     }
 
     /**
+     * @return Partition.
+     */
+    int partition() {
+        return part;
+    }
+
+    /**
+     * @return Update counter.
+     */
+    long updateCounter() {
+        return updateCntr;
+    }
+
+    /**
+     * Mark that entry create on backup.
+     */
+    void markBackup() {
+        flags |= BACKUP_ENTRY;
+    }
+
+    /**
+     * Mark that entry filtered.
+     */
+    void markFiltered() {
+        flags |= FILTERED_ENTRY;
+        newVal = null;
+        oldVal = null;
+        key = null;
+        depInfo = null;
+    }
+
+    /**
+     * @return {@code True} if entry sent by backup node.
+     */
+    boolean isBackup() {
+        return (flags & BACKUP_ENTRY) != 0;
+    }
+
+    /**
+     * @return {@code True} if entry was filtered.
+     */
+    boolean isFiltered() {
+        return (flags & FILTERED_ENTRY) != 0;
+    }
+
+    /**
+     * @param cntrs Filtered events.
+     */
+    void filteredEvents(GridLongList cntrs) {
+        filteredEvts = cntrs;
+    }
+
+    /**
+     * @return previous filtered events.
+     */
+    long[] filteredEvents() {
+        return filteredEvts == null ? null : filteredEvts.array();
+    }
+
+    /**
      * @param cctx Cache context.
      * @throws IgniteCheckedException In case of error.
      */
@@ -138,13 +238,15 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
      * @throws IgniteCheckedException In case of error.
      */
     void unmarshal(GridCacheContext cctx, @Nullable ClassLoader ldr) throws IgniteCheckedException {
-        key.finishUnmarshal(cctx.cacheObjectContext(), ldr);
+        if (!isFiltered()) {
+            key.finishUnmarshal(cctx.cacheObjectContext(), ldr);
 
-        if (newVal != null)
-            newVal.finishUnmarshal(cctx.cacheObjectContext(), ldr);
+            if (newVal != null)
+                newVal.finishUnmarshal(cctx.cacheObjectContext(), ldr);
 
-        if (oldVal != null)
-            oldVal.finishUnmarshal(cctx.cacheObjectContext(), ldr);
+            if (oldVal != null)
+                oldVal.finishUnmarshal(cctx.cacheObjectContext(), ldr);
+        }
     }
 
     /**
@@ -208,23 +310,53 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
                 writer.incrementState();
 
             case 2:
-                if (!writer.writeMessage("key", key))
+                if (!writer.writeMessage("filteredEvts", filteredEvts))
                     return false;
 
                 writer.incrementState();
 
             case 3:
-                if (!writer.writeMessage("newVal", newVal))
+                if (!writer.writeByte("flags", flags))
                     return false;
 
                 writer.incrementState();
 
             case 4:
+                if (!writer.writeMessage("key", key))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeMessage("newVal", newVal))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
                 if (!writer.writeMessage("oldVal", oldVal))
                     return false;
 
                 writer.incrementState();
 
+            case 7:
+                if (!writer.writeInt("part", part))
+                    return false;
+
+                writer.incrementState();
+
+            case 8:
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 9:
+                if (!writer.writeLong("updateCntr", updateCntr))
+                    return false;
+
+                writer.incrementState();
+
         }
 
         return true;
@@ -259,7 +391,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
                 reader.incrementState();
 
             case 2:
-                key = reader.readMessage("key");
+                filteredEvts = reader.readMessage("filteredEvts");
 
                 if (!reader.isLastRead())
                     return false;
@@ -267,7 +399,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
                 reader.incrementState();
 
             case 3:
-                newVal = reader.readMessage("newVal");
+                flags = reader.readByte("flags");
 
                 if (!reader.isLastRead())
                     return false;
@@ -275,6 +407,22 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
                 reader.incrementState();
 
             case 4:
+                key = reader.readMessage("key");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                newVal = reader.readMessage("newVal");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
                 oldVal = reader.readMessage("oldVal");
 
                 if (!reader.isLastRead())
@@ -282,6 +430,30 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
 
                 reader.incrementState();
 
+            case 7:
+                part = reader.readInt("part");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 8:
+                topVer = reader.readMessage("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 9:
+                updateCntr = reader.readLong("updateCntr");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
         }
 
         return reader.afterMessageRead(CacheContinuousQueryEntry.class);
@@ -289,7 +461,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 5;
+        return 10;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
index 7417138..a1ebe39 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
@@ -58,8 +58,7 @@ class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override
-    public K getKey() {
+    @Override public K getKey() {
         return e.key().value(cctx.cacheObjectContext(), false);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index e517c70..b69d4cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -21,8 +21,21 @@ import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeMap;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
 import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryUpdatedListener;
 import javax.cache.event.EventType;
@@ -30,26 +43,37 @@ import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.CacheQueryExecutedEvent;
 import org.apache.ignite.events.CacheQueryReadEvent;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteDeploymentCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
+import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
+import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
 import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter;
+import org.apache.ignite.internal.util.GridConcurrentSkipListSet;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentLinkedDeque8;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
@@ -61,6 +85,9 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** */
+    private static final int BACKUP_ACK_THRESHOLD = 100;
+
     /** Cache name. */
     private String cacheName;
 
@@ -97,9 +124,27 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
     /** Whether to skip primary check for REPLICATED cache. */
     private transient boolean skipPrimaryCheck;
 
+    /** Backup queue. */
+    private transient Collection<CacheContinuousQueryEntry> backupQueue;
+
+    /** */
+    private boolean localCache;
+
+    /** */
+    private transient ConcurrentMap<Integer, PartitionRecovery> rcvs;
+
+    /** */
+    private transient ConcurrentMap<Integer, EntryBuffer> entryBufs;
+
+    /** */
+    private transient AcknowledgeBuffer ackBuf;
+
     /** */
     private transient int cacheId;
 
+    /** */
+    private Map<Integer, Long> initUpdCntrs;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -121,6 +166,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
      * @param ignoreExpired Ignore expired events flag.
      * @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache.
      * @param taskHash Task name hash code.
+     * @param locCache {@code True} if local cache.
      */
     public CacheContinuousQueryHandler(
         String cacheName,
@@ -133,7 +179,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
         boolean sync,
         boolean ignoreExpired,
         int taskHash,
-        boolean skipPrimaryCheck) {
+        boolean skipPrimaryCheck,
+        boolean locCache) {
         assert topic != null;
         assert locLsnr != null;
 
@@ -148,6 +195,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
         this.ignoreExpired = ignoreExpired;
         this.taskHash = taskHash;
         this.skipPrimaryCheck = skipPrimaryCheck;
+        this.localCache = locCache;
 
         cacheId = CU.cacheId(cacheName);
     }
@@ -173,6 +221,11 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
+    @Override public void updateCounters(Map<Integer, Long> cntrs) {
+        this.initUpdCntrs = cntrs;
+    }
+
+    /** {@inheritDoc} */
     @Override public RegisterStatus register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx)
         throws IgniteCheckedException {
         assert nodeId != null;
@@ -185,8 +238,32 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
         if (rmtFilter != null)
             ctx.resource().injectGeneric(rmtFilter);
 
+        entryBufs = new ConcurrentHashMap<>();
+
+        backupQueue = new ConcurrentLinkedDeque8<>();
+
+        ackBuf = new AcknowledgeBuffer();
+
+        rcvs = new ConcurrentHashMap<>();
+
         final boolean loc = nodeId.equals(ctx.localNodeId());
 
+        assert !skipPrimaryCheck || loc;
+
+        final GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+        if (!internal && cctx != null && initUpdCntrs != null) {
+            Map<Integer, Long> map = cctx.topology().updateCounters();
+
+            for (Map.Entry<Integer, Long> e : map.entrySet()) {
+                Long cntr0 = initUpdCntrs.get(e.getKey());
+                Long cntr1 = e.getValue();
+
+                if (cntr0 == null || cntr1 > cntr0)
+                    initUpdCntrs.put(e.getKey(), cntr1);
+            }
+        }
+
         CacheContinuousQueryListener<K, V> lsnr = new CacheContinuousQueryListener<K, V>() {
             @Override public void onExecution() {
                 if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
@@ -212,11 +289,15 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
                 if (ignoreExpired && evt.getEventType() == EventType.EXPIRED)
                     return;
 
-                GridCacheContext<K, V> cctx = cacheContext(ctx);
+                final GridCacheContext<K, V> cctx = cacheContext(ctx);
 
-                if (cctx.isReplicated() && !skipPrimaryCheck && !primary)
+                // Check that cache stopped.
+                if (cctx == null)
                     return;
 
+                // skipPrimaryCheck is set only when listen locally for replicated cache events.
+                assert !skipPrimaryCheck || (cctx.isReplicated() && ctx.localNodeId().equals(nodeId));
+
                 boolean notify = true;
 
                 if (rmtFilter != null) {
@@ -228,54 +309,94 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
                     }
                 }
 
-                if (notify) {
-                    if (loc)
-                        locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt));
-                    else {
-                        try {
-                            if (cctx.deploymentEnabled() && ctx.discovery().node(nodeId) != null) {
-                                evt.entry().prepareMarshal(cctx);
-
-                                cctx.deploy().prepare(evt.entry());
+                try {
+                    final CacheContinuousQueryEntry entry = evt.entry();
+
+                    if (!notify)
+                        entry.markFiltered();
+
+                    if (primary || skipPrimaryCheck) {
+                        if (loc) {
+                            if (!localCache) {
+                                Collection<CacheContinuousQueryEntry> entries = handleEvent(ctx, entry);
+
+                                if (!entries.isEmpty()) {
+                                    final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name());
+
+                                    Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries,
+                                        new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() {
+                                            @Override public CacheEntryEvent<? extends K, ? extends V> apply(
+                                                CacheContinuousQueryEntry e) {
+                                                return new CacheContinuousQueryEvent<>(cache, cctx, e);
+                                            }
+                                        },
+                                        new IgnitePredicate<CacheContinuousQueryEntry>() {
+                                            @Override public boolean apply(CacheContinuousQueryEntry entry) {
+                                                return !entry.isFiltered();
+                                            }
+                                        }
+                                    );
+
+                                    locLsnr.onUpdated(evts);
+
+                                    if (!internal && !skipPrimaryCheck)
+                                        sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx);
+                                }
+                            }
+                            else {
+                                if (!entry.isFiltered())
+                                    locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt));
                             }
-                            else
-                                evt.entry().prepareMarshal(cctx);
-
-                            ctx.continuous().addNotification(nodeId, routineId, evt.entry(), topic, sync, true);
                         }
-                        catch (ClusterTopologyCheckedException ex) {
-                            IgniteLogger log = ctx.log(getClass());
+                        else {
+                            if (!entry.isFiltered())
+                                prepareEntry(cctx, nodeId, entry);
 
-                            if (log.isDebugEnabled())
-                                log.debug("Failed to send event notification to node, node left cluster " +
-                                    "[node=" + nodeId + ", err=" + ex + ']');
-                        }
-                        catch (IgniteCheckedException ex) {
-                            U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex);
+                            CacheContinuousQueryEntry e = handleEntry(entry);
+
+                            if (e != null)
+                                ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true);
                         }
                     }
+                    else {
+                        if (!internal) {
+                            entry.markBackup();
 
-                    if (recordIgniteEvt) {
-                        ctx.event().record(new CacheQueryReadEvent<>(
-                            ctx.discovery().localNode(),
-                            "Continuous query executed.",
-                            EVT_CACHE_QUERY_OBJECT_READ,
-                            CacheQueryType.CONTINUOUS.name(),
-                            cacheName,
-                            null,
-                            null,
-                            null,
-                            rmtFilter,
-                            null,
-                            nodeId,
-                            taskName(),
-                            evt.getKey(),
-                            evt.getValue(),
-                            evt.getOldValue(),
-                            null
-                        ));
+                            backupQueue.add(entry);
+                        }
                     }
                 }
+                catch (ClusterTopologyCheckedException ex) {
+                    IgniteLogger log = ctx.log(getClass());
+
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to send event notification to node, node left cluster " +
+                            "[node=" + nodeId + ", err=" + ex + ']');
+                }
+                catch (IgniteCheckedException ex) {
+                    U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex);
+                }
+
+                if (recordIgniteEvt && notify) {
+                    ctx.event().record(new CacheQueryReadEvent<>(
+                        ctx.discovery().localNode(),
+                        "Continuous query executed.",
+                        EVT_CACHE_QUERY_OBJECT_READ,
+                        CacheQueryType.CONTINUOUS.name(),
+                        cacheName,
+                        null,
+                        null,
+                        null,
+                        rmtFilter,
+                        null,
+                        nodeId,
+                        taskName(),
+                        evt.getKey(),
+                        evt.getValue(),
+                        evt.getOldValue(),
+                        null
+                    ));
+                }
             }
 
             @Override public void onUnregister() {
@@ -283,6 +404,85 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
                     ((PlatformContinuousQueryFilter)rmtFilter).onQueryUnregister();
             }
 
+            @Override public void cleanupBackupQueue(Map<Integer, Long> updateCntrs) {
+                Iterator<CacheContinuousQueryEntry> it = backupQueue.iterator();
+
+                while (it.hasNext()) {
+                    CacheContinuousQueryEntry backupEntry = it.next();
+
+                    Long updateCntr = updateCntrs.get(backupEntry.partition());
+
+                    if (updateCntr != null && backupEntry.updateCounter() <= updateCntr)
+                        it.remove();
+                }
+            }
+
+            @Override public void flushBackupQueue(GridKernalContext ctx, AffinityTopologyVersion topVer) {
+                if (backupQueue.isEmpty())
+                    return;
+
+                try {
+                    GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+                    for (CacheContinuousQueryEntry e : backupQueue) {
+                        if (!e.isFiltered())
+                            prepareEntry(cctx, nodeId, e);
+                    }
+
+                    ctx.continuous().addBackupNotification(nodeId, routineId, backupQueue, topic);
+
+                    backupQueue.clear();
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(ctx.log(getClass()), "Failed to send backup event notification to node: " + nodeId, e);
+                }
+            }
+
+            @Override public void acknowledgeBackupOnTimeout(GridKernalContext ctx) {
+                sendBackupAcknowledge(ackBuf.acknowledgeOnTimeout(), routineId, ctx);
+            }
+
+            @Override public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer) {
+                try {
+                    assert evt != null;
+
+                    CacheContinuousQueryEntry e = evt.entry();
+
+                    EntryBuffer buf = entryBufs.get(e.partition());
+
+                    if (buf == null) {
+                        buf = new EntryBuffer();
+
+                        EntryBuffer oldRec = entryBufs.putIfAbsent(e.partition(), buf);
+
+                        if (oldRec != null)
+                            buf = oldRec;
+                    }
+
+                    e = buf.skipEntry(e);
+
+                    if (e != null)
+                        ctx.continuous().addNotification(nodeId, routineId, e, topic, sync, true);
+                }
+                catch (ClusterTopologyCheckedException ex) {
+                    IgniteLogger log = ctx.log(getClass());
+
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to send event notification to node, node left cluster " +
+                                "[node=" + nodeId + ", err=" + ex + ']');
+                }
+                catch (IgniteCheckedException ex) {
+                    U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex);
+                }
+            }
+
+            @Override public void onPartitionEvicted(int part) {
+                for (Iterator<CacheContinuousQueryEntry> it = backupQueue.iterator(); it.hasNext();) {
+                    if (it.next().partition() == part)
+                        it.remove();
+                }
+            }
+
             @Override public boolean oldValueRequired() {
                 return oldValRequired;
             }
@@ -304,6 +504,23 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
         return mgr.registerListener(routineId, lsnr, internal);
     }
 
+    /**
+     * @param cctx Context.
+     * @param nodeId ID of the node that started routine.
+     * @param entry Entry.
+     * @throws IgniteCheckedException In case of error.
+     */
+    private void prepareEntry(GridCacheContext cctx, UUID nodeId, CacheContinuousQueryEntry entry)
+        throws IgniteCheckedException {
+        if (cctx.kernalContext().config().isPeerClassLoadingEnabled() && cctx.discovery().node(nodeId) != null) {
+            entry.prepareMarshal(cctx);
+
+            cctx.deploy().prepare(entry);
+        }
+        else
+            entry.prepareMarshal(cctx);
+    }
+
     /** {@inheritDoc} */
     @Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) {
         // No-op.
@@ -366,17 +583,377 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
 
         final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name());
 
-        Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries,
+        Collection<CacheContinuousQueryEntry> entries0 = new ArrayList<>();
+
+        for (CacheContinuousQueryEntry e : entries)
+            entries0.addAll(handleEvent(ctx, e));
+
+        Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0,
             new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() {
                 @Override public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry e) {
                     return new CacheContinuousQueryEvent<>(cache, cctx, e);
                 }
+            },
+            new IgnitePredicate<CacheContinuousQueryEntry>() {
+                @Override public boolean apply(CacheContinuousQueryEntry entry) {
+                    return !entry.isFiltered();
+                }
             }
         );
 
         locLsnr.onUpdated(evts);
     }
 
+    /**
+     * @param ctx Context.
+     * @param e entry.
+     * @return Entry collection.
+     */
+    private Collection<CacheContinuousQueryEntry> handleEvent(GridKernalContext ctx,
+        CacheContinuousQueryEntry e) {
+        assert e != null;
+
+        if (internal) {
+            if (e.isFiltered())
+                return Collections.emptyList();
+            else
+                return F.asList(e);
+        }
+
+        // Initial query entry or evicted entry.
+        // This events should be fired immediately.
+        if (e.updateCounter() == -1)
+            return F.asList(e);
+
+        PartitionRecovery rec = rcvs.get(e.partition());
+
+        if (rec == null) {
+            rec = new PartitionRecovery(ctx.log(getClass()), cacheContext(ctx).topology().topologyVersion(),
+                initUpdCntrs == null ? null : initUpdCntrs.get(e.partition()));
+
+            PartitionRecovery oldRec = rcvs.putIfAbsent(e.partition(), rec);
+
+            if (oldRec != null)
+                rec = oldRec;
+        }
+
+        return rec.collectEntries(e);
+    }
+
+    /**
+     * @param e Entry.
+     * @return Entry.
+     */
+    private CacheContinuousQueryEntry handleEntry(CacheContinuousQueryEntry e) {
+        assert e != null;
+        assert entryBufs != null;
+
+        if (internal) {
+            if (e.isFiltered())
+                return null;
+            else
+                return e;
+        }
+
+        // Initial query entry.
+        // This events should be fired immediately.
+        if (e.updateCounter() == -1)
+            return e;
+
+        EntryBuffer buf = entryBufs.get(e.partition());
+
+        if (buf == null) {
+            buf = new EntryBuffer();
+
+            EntryBuffer oldRec = entryBufs.putIfAbsent(e.partition(), buf);
+
+            if (oldRec != null)
+                buf = oldRec;
+        }
+
+        return buf.handle(e);
+    }
+
+    /**
+     *
+     */
+    private static class PartitionRecovery {
+        /** Event which means hole in sequence. */
+        private static final CacheContinuousQueryEntry HOLE = new CacheContinuousQueryEntry();
+
+        /** */
+        private final static int MAX_BUFF_SIZE = 100;
+
+        /** */
+        private IgniteLogger log;
+
+        /** */
+        private long lastFiredEvt;
+
+        /** */
+        private AffinityTopologyVersion curTop = AffinityTopologyVersion.NONE;
+
+        /** */
+        private final Map<Long, CacheContinuousQueryEntry> pendingEvts = new TreeMap<>();
+
+        /**
+         * @param log Logger.
+         * @param topVer Topology version.
+         * @param initCntr Update counters.
+         */
+        public PartitionRecovery(IgniteLogger log, AffinityTopologyVersion topVer, @Nullable Long initCntr) {
+            this.log = log;
+
+            if (initCntr != null) {
+                this.lastFiredEvt = initCntr;
+
+                curTop = topVer;
+            }
+        }
+
+        /**
+         * Add continuous entry.
+         *
+         * @param entry Cache continuous query entry.
+         * @return Collection entries which will be fired.
+         */
+        public Collection<CacheContinuousQueryEntry> collectEntries(CacheContinuousQueryEntry entry) {
+            assert entry != null;
+
+            List<CacheContinuousQueryEntry> entries;
+
+            synchronized (pendingEvts) {
+                // Received first event.
+                if (curTop == AffinityTopologyVersion.NONE) {
+                    lastFiredEvt = entry.updateCounter();
+
+                    curTop = entry.topologyVersion();
+
+                    return F.asList(entry);
+                }
+
+                if (curTop.compareTo(entry.topologyVersion()) < 0) {
+                    if (entry.updateCounter() == 1 && !entry.isBackup()) {
+                        entries = new ArrayList<>(pendingEvts.size());
+
+                        for (CacheContinuousQueryEntry evt : pendingEvts.values()) {
+                            if (evt != HOLE && !evt.isFiltered())
+                                entries.add(evt);
+                        }
+
+                        pendingEvts.clear();
+
+                        curTop = entry.topologyVersion();
+
+                        lastFiredEvt = entry.updateCounter();
+
+                        entries.add(entry);
+
+                        return entries;
+                    }
+
+                    curTop = entry.topologyVersion();
+                }
+
+                // Check duplicate.
+                if (entry.updateCounter() > lastFiredEvt) {
+                    pendingEvts.put(entry.updateCounter(), entry);
+
+                    // Put filtered events.
+                    if (entry.filteredEvents() != null) {
+                        for (long cnrt : entry.filteredEvents()) {
+                            if (cnrt > lastFiredEvt)
+                                pendingEvts.put(cnrt, HOLE);
+                        }
+                    }
+                }
+                else {
+                    if (log.isDebugEnabled())
+                        log.debug("Skip duplicate continuous query message: " + entry);
+
+                    return Collections.emptyList();
+                }
+
+                if (pendingEvts.isEmpty())
+                    return Collections.emptyList();
+
+                Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = pendingEvts.entrySet().iterator();
+
+                entries = new ArrayList<>();
+
+                if (pendingEvts.size() >= MAX_BUFF_SIZE) {
+                    for (int i = 0; i < MAX_BUFF_SIZE - (MAX_BUFF_SIZE / 10); i++) {
+                        Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next();
+
+                        if (e.getValue() != HOLE && !e.getValue().isFiltered())
+                            entries.add(e.getValue());
+
+                        lastFiredEvt = e.getKey();
+
+                        iter.remove();
+                    }
+                }
+                else {
+                    // Elements are consistently.
+                    while (iter.hasNext()) {
+                        Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next();
+
+                        if (e.getKey() == lastFiredEvt + 1) {
+                            ++lastFiredEvt;
+
+                            if (e.getValue() != HOLE && !e.getValue().isFiltered())
+                                entries.add(e.getValue());
+
+                            iter.remove();
+                        }
+                        else
+                            break;
+                    }
+                }
+            }
+
+            return entries;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class EntryBuffer {
+        /** */
+        private final static int MAX_BUFF_SIZE = 100;
+
+        /** */
+        private final GridConcurrentSkipListSet<Long> buf = new GridConcurrentSkipListSet<>();
+
+        /** */
+        private AtomicLong lastFiredCntr = new AtomicLong();
+
+        /**
+         * @param newVal New value.
+         * @return Old value if previous value less than new value otherwise {@code -1}.
+         */
+        private long updateFiredCounter(long newVal) {
+            long prevVal = lastFiredCntr.get();
+
+            while (prevVal < newVal) {
+                if (lastFiredCntr.compareAndSet(prevVal, newVal))
+                    return prevVal;
+                else
+                    prevVal = lastFiredCntr.get();
+            }
+
+            return prevVal >= newVal ? -1 : prevVal;
+        }
+
+        /**
+         * @param e Entry.
+         * @param topVer Topology version.
+         * @return Continuous query entry.
+         */
+        private CacheContinuousQueryEntry skipEntry(CacheContinuousQueryEntry e) {
+            if (lastFiredCntr.get() > e.updateCounter() || e.updateCounter() == 1) {
+
+                e.markFiltered();
+
+                return e;
+            }
+            else {
+                buf.add(e.updateCounter());
+
+                // Double check. If another thread sent a event with counter higher than this event.
+                if (lastFiredCntr.get() > e.updateCounter() && buf.contains(e.updateCounter())) {
+                    buf.remove(e.updateCounter());
+
+                    e.markFiltered();
+
+                    return e;
+                }
+                else
+                    return null;
+            }
+        }
+
+        /**
+         * Add continuous entry.
+         *
+         * @param e Cache continuous query entry.
+         * @return Collection entries which will be fired.
+         */
+        public CacheContinuousQueryEntry handle(CacheContinuousQueryEntry e) {
+            assert e != null;
+
+            if (e.isFiltered()) {
+                Long last = buf.lastx();
+                Long first = buf.firstx();
+
+                if (last != null && first != null && last - first >= MAX_BUFF_SIZE) {
+                    NavigableSet<Long> prevHoles = buf.subSet(first, true, last, true);
+
+                    GridLongList filteredEvts = new GridLongList((int)(last - first));
+
+                    int size = 0;
+
+                    Long cntr;
+
+                    while ((cntr = prevHoles.pollFirst()) != null) {
+                        filteredEvts.add(cntr);
+
+                        ++size;
+                    }
+
+                    filteredEvts.truncate(size, true);
+
+                    e.filteredEvents(filteredEvts);
+
+                    return e;
+                }
+
+                if (lastFiredCntr.get() > e.updateCounter() || e.updateCounter() == 1)
+                    return e;
+                else {
+                    buf.add(e.updateCounter());
+
+                    // Double check. If another thread sent a event with counter higher than this event.
+                    if (lastFiredCntr.get() > e.updateCounter() && buf.contains(e.updateCounter())) {
+                        buf.remove(e.updateCounter());
+
+                        return e;
+                    }
+                    else
+                        return null;
+                }
+            }
+            else {
+                long prevVal = updateFiredCounter(e.updateCounter());
+
+                if (prevVal == -1)
+                    return e;
+                else {
+                    NavigableSet<Long> prevHoles = buf.subSet(prevVal, true, e.updateCounter(), true);
+
+                    GridLongList filteredEvts = new GridLongList((int)(e.updateCounter() - prevVal));
+
+                    int size = 0;
+
+                    Long cntr;
+
+                    while ((cntr = prevHoles.pollFirst()) != null) {
+                        filteredEvts.add(cntr);
+
+                        ++size;
+                    }
+
+                    filteredEvts.truncate(size, true);
+
+                    e.filteredEvents(filteredEvts);
+
+                    return e;
+                }
+            }
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException {
         assert ctx != null;
@@ -397,6 +974,65 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
+    @Override public GridContinuousBatch createBatch() {
+        return new GridContinuousBatchAdapter();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onBatchAcknowledged(final UUID routineId,
+        GridContinuousBatch batch,
+        final GridKernalContext ctx) {
+        sendBackupAcknowledge(ackBuf.onAcknowledged(batch), routineId, ctx);
+    }
+
+    /**
+     * @param t Acknowledge information.
+     * @param routineId Routine ID.
+     * @param ctx Context.
+     */
+    private void sendBackupAcknowledge(final IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> t,
+        final UUID routineId,
+        final GridKernalContext ctx) {
+        if (t != null) {
+            ctx.closure().runLocalSafe(new Runnable() {
+                @Override public void run() {
+                    GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+                    CacheContinuousQueryBatchAck msg = new CacheContinuousQueryBatchAck(cctx.cacheId(),
+                        routineId,
+                        t.get1());
+
+                    Collection<ClusterNode> nodes = new HashSet<>();
+
+                    for (AffinityTopologyVersion topVer : t.get2())
+                        nodes.addAll(ctx.discovery().cacheNodes(topVer));
+
+                    for (ClusterNode node : nodes) {
+                        if (!node.id().equals(ctx.localNodeId())) {
+                            try {
+                                cctx.io().send(node, msg, GridIoPolicy.SYSTEM_POOL);
+                            }
+                            catch (ClusterTopologyCheckedException e) {
+                                IgniteLogger log = ctx.log(getClass());
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Failed to send acknowledge message, node left " +
+                                        "[msg=" + msg + ", node=" + node + ']');
+                            }
+                            catch (IgniteCheckedException e) {
+                                IgniteLogger log = ctx.log(getClass());
+
+                                U.error(log, "Failed to send acknowledge message " +
+                                    "[msg=" + msg + ", node=" + node + ']', e);
+                            }
+                        }
+                    }
+                }
+            });
+        }
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public Object orderedTopic() {
         return topic;
     }
@@ -471,6 +1107,93 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
         return ctx.cache().<K, V>context().cacheContext(cacheId);
     }
 
+    /** */
+    private static class AcknowledgeBuffer {
+        /** */
+        private int size;
+
+        /** */
+        @GridToStringInclude
+        private Map<Integer, Long> updateCntrs = new HashMap<>();
+
+        /** */
+        @GridToStringInclude
+        private Set<AffinityTopologyVersion> topVers = U.newHashSet(1);
+
+        /**
+         * @param batch Batch.
+         * @return Non-null tuple if acknowledge should be sent to backups.
+         */
+        @SuppressWarnings("unchecked")
+        @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>
+        onAcknowledged(GridContinuousBatch batch) {
+            size += batch.size();
+
+            Collection<CacheContinuousQueryEntry> entries = (Collection)batch.collect();
+
+            for (CacheContinuousQueryEntry e : entries)
+                addEntry(e);
+
+            return size >= BACKUP_ACK_THRESHOLD ? acknowledgeData() : null;
+        }
+
+        /**
+         * @param e Entry.
+         * @return Non-null tuple if acknowledge should be sent to backups.
+         */
+        @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>
+        onAcknowledged(CacheContinuousQueryEntry e) {
+            size++;
+
+            addEntry(e);
+
+            return size >= BACKUP_ACK_THRESHOLD ? acknowledgeData() : null;
+        }
+
+        /**
+         * @param e Entry.
+         */
+        private void addEntry(CacheContinuousQueryEntry e) {
+            topVers.add(e.topologyVersion());
+
+            Long cntr0 = updateCntrs.get(e.partition());
+
+            if (cntr0 == null || e.updateCounter() > cntr0)
+                updateCntrs.put(e.partition(), e.updateCounter());
+        }
+
+        /**
+         * @return Non-null tuple if acknowledge should be sent to backups.
+         */
+        @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>
+            acknowledgeOnTimeout() {
+            return size > 0 ? acknowledgeData() : null;
+        }
+
+        /**
+         * @return Tuple with acknowledge information.
+         */
+        private IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> acknowledgeData() {
+            assert size > 0;
+
+            Map<Integer, Long> cntrs = new HashMap<>(updateCntrs);
+
+            IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> res =
+                new IgniteBiTuple<>(cntrs, topVers);
+
+            topVers = U.newHashSet(1);
+
+            size = 0;
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(AcknowledgeBuffer.class, this);
+        }
+    }
+
     /**
      * Deployable object.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index a3c19a9..8342acf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -17,6 +17,10 @@
 
 package org.apache.ignite.internal.processors.cache.query.continuous;
 
+import java.util.Map;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+
 /**
  * Continuous query listener.
  */
@@ -41,6 +45,37 @@ interface CacheContinuousQueryListener<K, V> {
     public void onUnregister();
 
     /**
+     * Cleans backup queue.
+     *
+     * @param updateCntrs Update indexes map.
+     */
+    public void cleanupBackupQueue(Map<Integer, Long> updateCntrs);
+
+    /**
+     * Flushes backup queue.
+     *
+     * @param ctx Context.
+     * @param topVer Topology version.
+     */
+    public void flushBackupQueue(GridKernalContext ctx, AffinityTopologyVersion topVer);
+
+    /**
+     * @param ctx Context.
+     */
+    public void acknowledgeBackupOnTimeout(GridKernalContext ctx);
+
+    /**
+     * @param evt Event
+     * @param topVer Topology version.
+     */
+    public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer);
+
+    /**
+     * @param part Partition.
+     */
+    public void onPartitionEvicted(int part);
+
+    /**
      * @return Whether old value is required.
      */
     public boolean oldValueRequired();


Mime
View raw message