ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject [02/36] ignite git commit: IGNITE-426 temp commit.
Date Wed, 04 Nov 2015 14:10:45 GMT
IGNITE-426 temp commit.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3a57d71e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3a57d71e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3a57d71e

Branch: refs/heads/ignite-462-2
Commit: 3a57d71e3fa043ecd08c2eb4a1981eddc31142dd
Parents: dca9e57
Author: nikolay_tikhonov <ntikhonov@gridgain.com>
Authored: Wed Sep 2 15:38:50 2015 +0300
Committer: nikolay_tikhonov <ntikhonov@gridgain.com>
Committed: Wed Nov 4 17:01:56 2015 +0300

----------------------------------------------------------------------
 .../dht/GridClientPartitionTopology.java        |   40 +-
 .../distributed/dht/GridDhtLocalPartition.java  |   61 +-
 .../dht/GridDhtPartitionTopology.java           |   26 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |  112 +-
 .../CacheContinuousQueryBatchAck.java           |  156 +++
 .../continuous/CacheContinuousQueryEntry.java   |   74 +-
 .../continuous/CacheContinuousQueryHandler.java |  347 +++++-
 .../CacheContinuousQueryListener.java           |   34 +-
 .../continuous/CacheContinuousQueryManager.java |   88 +-
 .../continuous/GridContinuousBatch.java         |    7 +
 .../continuous/GridContinuousBatchAdapter.java  |    7 +
 .../continuous/GridContinuousProcessor.java     |  173 ++-
 ...acheContinuousQueryFailoverAbstractTest.java | 1104 ++++++++++++++++++
 ...ueryFailoverAtomicPrimaryWriteOrderTest.java |   32 +
 ...inuousQueryFailoverAtomicReplicatedTest.java |   39 +
 .../CacheContinuousQueryFailoverAtomicTest.java |   39 +
 ...ContinuousQueryFailoverTxReplicatedTest.java |   32 +
 .../CacheContinuousQueryFailoverTxTest.java     |   39 +
 18 files changed, 2313 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3a57d71e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 162c116..516b7bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -94,6 +94,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     /** Lock. */
     private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
+    /** Partition update counter. */
+    private Map<Integer, Long> cntrMap = new HashMap<>();
+
     /**
      * @param cctx Context.
      * @param cacheId Cache ID.
@@ -527,7 +530,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     /** {@inheritDoc} */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
     @Nullable @Override public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId,
-        GridDhtPartitionFullMap partMap) {
+        GridDhtPartitionFullMap partMap,
+        Map<Integer, Long> cntrMap) {
         if (log.isDebugEnabled())
             log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']');
 
@@ -602,6 +606,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
 
             part2node = p2n;
 
+            if (cntrMap != null)
+                this.cntrMap = new HashMap<>(cntrMap);
+
             consistencyCheck();
 
             if (log.isDebugEnabled())
@@ -617,7 +624,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     /** {@inheritDoc} */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
     @Nullable @Override public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId,
-        GridDhtPartitionMap parts) {
+        GridDhtPartitionMap parts,
+        Map<Integer, Long> cntrMap) {
         if (log.isDebugEnabled())
             log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']');
 
@@ -698,6 +706,15 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
                 }
             }
 
+            if (cntrMap != null) {
+                for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
+                    Long cntr = this.cntrMap.get(e.getKey());
+
+                    if (cntr == null || cntr < e.getValue())
+                        this.cntrMap.put(e.getKey(), e.getValue());
+                }
+            }
+
             consistencyCheck();
 
             if (log.isDebugEnabled())
@@ -852,6 +869,25 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public Map<Integer, Long> updateCounters() {
+        lock.readLock().lock();
+
+        try {
+            return new HashMap<>(cntrMap);
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) {
+        assert false;
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public void printMemoryStats(int threshold) {
         X.println(">>>  Cache partition topology stats [grid=" + cctx.gridName() + ", cacheId=" + cacheId + ']');
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a57d71e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 749d06a..86f1f41 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -17,6 +17,18 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicStampedReference;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -111,11 +123,14 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
     private final LongAdder8 mapPubSize = new LongAdder8();
 
     /** Remove queue. */
-    private final GridCircularBuffer<T2<KeyCacheObject, GridCacheVersion>> rmvQueue;
+    private GridCircularBuffer<T2<KeyCacheObject, GridCacheVersion>> rmvQueue;
 
     /** Group reservations. */
     private final CopyOnWriteArrayList<GridDhtPartitionsReservation> reservations = new CopyOnWriteArrayList<>();
 
+    /** Continuous query update index. */
+    private final AtomicLong contQryUpdIdx = new AtomicLong();
+
     /**
      * @param cctx Context.
      * @param id Partition ID.
@@ -141,7 +156,8 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
         int delQueueSize = CU.isSystemCache(cctx.name()) ? 100 :
             Math.max(MAX_DELETE_QUEUE_SIZE / cctx.affinity().partitions(), 20);
 
-        rmvQueue = new GridCircularBuffer<>(U.ceilPow2(delQueueSize));
+        if (cctx.deferredDelete())
+            rmvQueue = new GridCircularBuffer<>(U.ceilPow2(delQueueSize));
     }
 
     /**
@@ -295,6 +311,8 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
      * @throws IgniteCheckedException If failed.
      */
     public void onDeferredDelete(KeyCacheObject key, GridCacheVersion ver) throws IgniteCheckedException {
+        assert cctx.deferredDelete();
+
         try {
             T2<KeyCacheObject, GridCacheVersion> evicted = rmvQueue.add(new T2<>(key, ver));
 
@@ -496,7 +514,8 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
 
             ((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, updateSeq);
 
-            clearDeferredDeletes();
+            if (cctx.deferredDelete())
+                clearDeferredDeletes();
 
             return new GridFinishedFuture<>(true);
         }
@@ -541,13 +560,16 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
             if (cctx.isDrEnabled())
                 cctx.dr().partitionEvicted(id);
 
+            cctx.continuousQueries().onPartitionEvicted(id);
+
             cctx.dataStructures().onPartitionEvicted(id);
 
             rent.onDone();
 
             ((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, updateSeq);
 
-            clearDeferredDeletes();
+            if (cctx.deferredDelete())
+                clearDeferredDeletes();
 
             return true;
         }
@@ -612,6 +634,35 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
     }
 
     /**
+     * @return Next update index.
+     */
+    public long nextContinuousQueryUpdateIndex() {
+        return contQryUpdIdx.incrementAndGet();
+    }
+
+    /**
+     * @return Current update index.
+     */
+    public long continuousQueryUpdateIndex() {
+        return contQryUpdIdx.get();
+    }
+
+    /**
+     * @param val Update index value.
+     */
+    public void continuousQueryUpdateIndex(long val) {
+        while (true) {
+            long val0 = contQryUpdIdx.get();
+
+            if (val0 >= val)
+                break;
+
+            if (contQryUpdIdx.compareAndSet(val0, val))
+                break;
+        }
+    }
+
+    /**
      * Clears values for this partition.
      */
     private void clearAll() {
@@ -761,6 +812,8 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
      *
      */
     private void clearDeferredDeletes() {
+        assert cctx.deferredDelete();
+
         rmvQueue.forEach(new CI1<T2<KeyCacheObject, GridCacheVersion>>() {
             @Override public void apply(T2<KeyCacheObject, GridCacheVersion> t) {
                 cctx.dht().removeVersionedEntry(t.get1(), t.get2());

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a57d71e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index d642314..3ac2b85 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
@@ -51,6 +52,8 @@ public interface GridDhtPartitionTopology {
      *
      * @param exchId Exchange ID.
      * @param exchFut Exchange future.
+     * @param updateSeq Update sequence.
+     * @param stopping Stopping flag.
      * @throws IgniteInterruptedCheckedException If interrupted.
      */
     public void updateTopologyVersion(
@@ -193,17 +196,27 @@ public interface GridDhtPartitionTopology {
     /**
      * @param exchId Exchange ID.
      * @param partMap Update partition map.
+     * @param cntrMap Partition update counters.
      * @return Local partition map if there were evictions or {@code null} otherwise.
      */
-    public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId, GridDhtPartitionFullMap partMap);
+    public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId,
+        GridDhtPartitionFullMap partMap,
+        @Nullable Map<Integer, Long> cntrMap);
 
     /**
      * @param exchId Exchange ID.
      * @param parts Partitions.
+     * @param cntrMap Partition update counters.
      * @return Local partition map if there were evictions or {@code null} otherwise.
      */
     @Nullable public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId,
-        GridDhtPartitionMap parts);
+        GridDhtPartitionMap parts,
+        @Nullable Map<Integer, Long> cntrMap);
+
+    /**
+     * @return Partition update counters.
+     */
+    public Map<Integer, Long> updateCounters();
 
     /**
      * @param part Partition to own.
@@ -213,6 +226,7 @@ public interface GridDhtPartitionTopology {
 
     /**
      * @param part Evicted partition.
+     * @param updateSeq Update sequence increment flag.
      */
     public void onEvicted(GridDhtLocalPartition part, boolean updateSeq);
 
@@ -228,4 +242,10 @@ public interface GridDhtPartitionTopology {
      * @param threshold Threshold for number of entries.
      */
     public void printMemoryStats(int threshold);
-}
\ No newline at end of file
+
+    /**
+     * @param topVer Topology version.
+     * @return {@code True} if rebalance process finished.
+     */
+    public boolean rebalanceFinished(AffinityTopologyVersion topVer);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a57d71e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 6bd283a..5d312b6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -102,6 +102,12 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     /** Lock. */
     private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
+    /** Partition update counter. */
+    private Map<Integer, Long> cntrMap = new HashMap<>();
+
+    /** */
+    private volatile AffinityTopologyVersion rebalancedTopVer = AffinityTopologyVersion.NONE;
+
     /**
      * @param cctx Context.
      */
@@ -131,6 +137,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             topReadyFut = null;
 
             topVer = AffinityTopologyVersion.NONE;
+
+            rebalancedTopVer = AffinityTopologyVersion.NONE;
         }
         finally {
             lock.writeLock().unlock();
@@ -220,6 +228,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             updateSeq.setIfGreater(updSeq);
 
             topReadyFut = exchFut;
+
+            rebalancedTopVer = AffinityTopologyVersion.NONE;;
         }
         finally {
             lock.writeLock().unlock();
@@ -525,6 +535,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 }
             }
 
+            updateRebalanceVersion();
+
             consistencyCheck();
         }
         finally {
@@ -732,7 +744,10 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
      * @param states Additional partition states.
      * @return List of nodes for the partition.
      */
-    private List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer, GridDhtPartitionState state, GridDhtPartitionState... states) {
+    private List<ClusterNode> nodes(int p,
+        AffinityTopologyVersion topVer,
+        GridDhtPartitionState state,
+        GridDhtPartitionState... states) {
         Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(CU.affinityNodes(cctx, topVer)) : null;
 
         lock.readLock().lock();
@@ -831,7 +846,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     /** {@inheritDoc} */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
     @Nullable @Override public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId,
-        GridDhtPartitionFullMap partMap) {
+        GridDhtPartitionFullMap partMap,
+        @Nullable Map<Integer, Long> cntrMap) {
         if (log.isDebugEnabled())
             log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']');
 
@@ -911,8 +927,26 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             part2node = p2n;
 
+            if (cntrMap != null) {
+                for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
+                    Long cntr = this.cntrMap.get(e.getKey());
+
+                    if (cntr == null || cntr < e.getValue())
+                        this.cntrMap.put(e.getKey(), e.getValue());
+                }
+
+                for (GridDhtLocalPartition part : locParts.values()) {
+                    Long cntr = cntrMap.get(part.id());
+
+                    if (cntr != null)
+                        part.continuousQueryUpdateIndex(cntr);
+                }
+            }
+
             boolean changed = checkEvictions(updateSeq);
 
+            updateRebalanceVersion();
+
             consistencyCheck();
 
             if (log.isDebugEnabled())
@@ -928,7 +962,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     /** {@inheritDoc} */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
     @Nullable @Override public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId,
-        GridDhtPartitionMap parts) {
+        GridDhtPartitionMap parts,
+        @Nullable Map<Integer, Long> cntrMap) {
         if (log.isDebugEnabled())
             log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']');
 
@@ -1006,8 +1041,26 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 }
             }
 
+            if (cntrMap != null) {
+                for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
+                    Long cntr = this.cntrMap.get(e.getKey());
+
+                    if (cntr == null || cntr < e.getValue())
+                        this.cntrMap.put(e.getKey(), e.getValue());
+                }
+
+                for (GridDhtLocalPartition part : locParts.values()) {
+                    Long cntr = cntrMap.get(part.id());
+
+                    if (cntr != null)
+                        part.continuousQueryUpdateIndex(cntr);
+                }
+            }
+
             changed |= checkEvictions(updateSeq);
 
+            updateRebalanceVersion();
+
             consistencyCheck();
 
             if (log.isDebugEnabled())
@@ -1204,6 +1257,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             if (part.own()) {
                 updateLocal(part.id(), loc.id(), part.state(), updateSeq.incrementAndGet());
 
+                updateRebalanceVersion();
+
                 consistencyCheck();
 
                 return true;
@@ -1254,14 +1309,61 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public Map<Integer, Long> updateCounters() {
+        lock.readLock().lock();
+
+        try {
+            Map<Integer, Long> res = new HashMap<>(cntrMap);
+
+            for (GridDhtLocalPartition part : locParts.values()) {
+                Long cntr0 = res.get(part.id());
+                Long cntr1 = part.continuousQueryUpdateIndex();
+
+                if (cntr0 == null || cntr1 > cntr0)
+                    res.put(part.id(), cntr1);
+            }
+
+            return res;
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) {
+        return topVer.equals(rebalancedTopVer);
+    }
+
+    /** {@inheritDoc} */
     @Override public void printMemoryStats(int threshold) {
-        X.println(">>>  Cache partition topology stats [grid=" + cctx.gridName() + ", cache=" + cctx.name() + ']');
+        X.println(">>> Cache partition topology stats [grid=" + cctx.gridName() + ", cache=" + cctx.name() + ']');
 
         for (GridDhtLocalPartition part : locParts.values()) {
             int size = part.size();
 
             if (size >= threshold)
-                X.println(">>>   Local partition [part=" + part.id() + ", size=" + size + ']');
+                X.println(">>> Local partition [part=" + part.id() + ", size=" + size + ']');
+        }
+    }
+
+    /**
+     *
+     */
+    private void updateRebalanceVersion() {
+        if (!rebalancedTopVer.equals(topVer)) {
+            for (int i = 0; i < cctx.affinity().partitions(); i++) {
+                List<ClusterNode> affNodes = cctx.affinity().nodes(i, topVer);
+                List<ClusterNode> owners = owners(i);
+
+                if (affNodes.size() != owners.size() || !owners.containsAll(affNodes))
+                    return;
+            }
+
+            rebalancedTopVer = topVer;
+
+            if (log.isDebugEnabled())
+                log.debug("Updated rebalanced version [cache=" + cctx.name() + ", ver=" + rebalancedTopVer + ']');
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a57d71e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java
new file mode 100644
index 0000000..1e9a848
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.GridDirectMap;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Batch acknowledgement.
+ */
+public class CacheContinuousQueryBatchAck extends GridCacheMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Routine ID. */
+    private UUID routineId;
+
+    /** Update indexes. */
+    @GridToStringInclude
+    @GridDirectMap(keyType = Integer.class, valueType = Long.class)
+    private Map<Integer, Long> updateIdxs;
+
+    /**
+     * Default constructor.
+     */
+    public CacheContinuousQueryBatchAck() {
+        // No-op.
+    }
+
+    /**
+     * @param cacheId Cache ID.
+     * @param routineId Routine ID.
+     * @param updateIdxs Update indexes.
+     */
+    CacheContinuousQueryBatchAck(int cacheId, UUID routineId, Map<Integer, Long> updateIdxs) {
+        this.cacheId = cacheId;
+        this.routineId = routineId;
+        this.updateIdxs = updateIdxs;
+    }
+
+    /**
+     * @return Routine ID.
+     */
+    UUID routineId() {
+        return routineId;
+    }
+
+    /**
+     * @return Update indexes.
+     */
+    Map<Integer, Long> updateIndexes() {
+        return updateIdxs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeUuid("routineId", routineId))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeMap("updateIdxs", updateIdxs, MessageCollectionItemType.INT, MessageCollectionItemType.LONG))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                routineId = reader.readUuid("routineId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                updateIdxs = reader.readMap("updateIdxs", MessageCollectionItemType.INT, MessageCollectionItemType.LONG, false);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 114;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 5;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CacheContinuousQueryBatchAck.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a57d71e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
index a4b35eb..9ea9b73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -22,6 +22,7 @@ import javax.cache.event.EventType;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
@@ -75,6 +76,17 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
     @GridDirectTransient
     private GridDeploymentInfo depInfo;
 
+    /** Partition. */
+    private int part;
+
+    /** Update index. */
+    private long updateIdx;
+
+    /** */
+    @GridToStringInclude
+    @GridDirectTransient
+    private AffinityTopologyVersion topVer;
+
     /**
      * Required by {@link org.apache.ignite.plugin.extensions.communication.Message}.
      */
@@ -88,18 +100,34 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
      * @param key Key.
      * @param newVal New value.
      * @param oldVal Old value.
+     * @param part Partition.
+     * @param updateIdx Update index.
+     * @param topVer Topology version if applicable.
      */
     CacheContinuousQueryEntry(
         int cacheId,
         EventType evtType,
         KeyCacheObject key,
         @Nullable CacheObject newVal,
-        @Nullable CacheObject oldVal) {
+        @Nullable CacheObject oldVal,
+        int part,
+        long updateIdx,
+        @Nullable AffinityTopologyVersion topVer) {
         this.cacheId = cacheId;
         this.evtType = evtType;
         this.key = key;
         this.newVal = newVal;
         this.oldVal = oldVal;
+        this.part = part;
+        this.updateIdx = updateIdx;
+        this.topVer = topVer;
+    }
+
+    /**
+     * @return Topology version if applicable.
+     */
+    @Nullable AffinityTopologyVersion topologyVersion() {
+        return topVer;
     }
 
     /**
@@ -117,6 +145,20 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
     }
 
     /**
+     * @return Partition.
+     */
+    int partition() {
+        return part;
+    }
+
+    /**
+     * @return Update index.
+     */
+    long updateIndex() {
+        return updateIdx;
+    }
+
+    /**
      * @param cctx Cache context.
      * @throws IgniteCheckedException In case of error.
      */
@@ -225,6 +267,18 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
 
                 writer.incrementState();
 
+            case 5:
+                if (!writer.writeInt("part", part))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
+                if (!writer.writeLong("updateIdx", updateIdx))
+                    return false;
+
+                writer.incrementState();
+
         }
 
         return true;
@@ -282,6 +336,22 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
 
                 reader.incrementState();
 
+            case 5:
+                part = reader.readInt("part");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
+                updateIdx = reader.readLong("updateIdx");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
         }
 
         return reader.afterMessageRead(CacheContinuousQueryEntry.class);
@@ -289,7 +359,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 5;
+        return 7;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a57d71e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index e517c70..4f783db 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -22,7 +22,12 @@ import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryUpdatedListener;
 import javax.cache.event.EventType;
@@ -35,13 +40,17 @@ import org.apache.ignite.events.CacheQueryReadEvent;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteDeploymentCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
+import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
+import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
 import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter;
 import org.apache.ignite.internal.util.typedef.C1;
@@ -49,7 +58,10 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentLinkedDeque8;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
@@ -61,6 +73,9 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** */
+    private static final int BACKUP_ACK_THRESHOLD = 100;
+
     /** Cache name. */
     private String cacheName;
 
@@ -97,6 +112,18 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
     /** Whether to skip primary check for REPLICATED cache. */
     private transient boolean skipPrimaryCheck;
 
+    /** Backup queue. */
+    private transient Collection<CacheContinuousQueryEntry> backupQueue;
+
+    /** */
+    private transient Map<Integer, Long> rcvCntrs;
+
+    /** */
+    private transient IgnitePredicate<CacheContinuousQueryEntry> dupEvtFilter;
+
+    /** */
+    private transient AcknowledgeBuffer ackBuf;
+
     /** */
     private transient int cacheId;
 
@@ -121,6 +148,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
      * @param ignoreExpired Ignore expired events flag.
      * @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache.
      * @param taskHash Task name hash code.
+     * @param locCache {@code True} if local cache.
      */
     public CacheContinuousQueryHandler(
         String cacheName,
@@ -133,7 +161,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
         boolean sync,
         boolean ignoreExpired,
         int taskHash,
-        boolean skipPrimaryCheck) {
+        boolean skipPrimaryCheck,
+        boolean locCache) {
         assert topic != null;
         assert locLsnr != null;
 
@@ -149,6 +178,14 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
         this.taskHash = taskHash;
         this.skipPrimaryCheck = skipPrimaryCheck;
 
+        if (locCache)
+            dupEvtFilter = F.alwaysTrue();
+        else {
+            rcvCntrs = new ConcurrentHashMap<>();
+
+            dupEvtFilter = new DuplicateEventFilter();
+        }
+
         cacheId = CU.cacheId(cacheName);
     }
 
@@ -185,8 +222,14 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
         if (rmtFilter != null)
             ctx.resource().injectGeneric(rmtFilter);
 
+        backupQueue = new ConcurrentLinkedDeque8<>();
+
+        ackBuf = new AcknowledgeBuffer();
+
         final boolean loc = nodeId.equals(ctx.localNodeId());
 
+        assert !skipPrimaryCheck || loc;
+
         CacheContinuousQueryListener<K, V> lsnr = new CacheContinuousQueryListener<K, V>() {
             @Override public void onExecution() {
                 if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
@@ -207,15 +250,16 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
                 }
             }
 
-            @Override public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary,
+            @Override public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt,
+                boolean primary,
                 boolean recordIgniteEvt) {
                 if (ignoreExpired && evt.getEventType() == EventType.EXPIRED)
                     return;
 
                 GridCacheContext<K, V> cctx = cacheContext(ctx);
 
-                if (cctx.isReplicated() && !skipPrimaryCheck && !primary)
-                    return;
+                // skipPrimaryCheck is set only when listen locally for replicated cache events.
+                assert !skipPrimaryCheck || (cctx.isReplicated() && ctx.localNodeId().equals(nodeId));
 
                 boolean notify = true;
 
@@ -229,30 +273,36 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
                 }
 
                 if (notify) {
-                    if (loc)
-                        locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt));
-                    else {
-                        try {
-                            if (cctx.deploymentEnabled() && ctx.discovery().node(nodeId) != null) {
-                                evt.entry().prepareMarshal(cctx);
-
-                                cctx.deploy().prepare(evt.entry());
+                    try {
+                        final CacheContinuousQueryEntry entry = evt.entry();
+
+                        if (primary || skipPrimaryCheck) {
+                            if (loc) {
+                                if (dupEvtFilter.apply(entry)) {
+                                    locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt));
+
+                                    if (!skipPrimaryCheck)
+                                        sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx);
+                                }
                             }
-                            else
-                                evt.entry().prepareMarshal(cctx);
+                            else {
+                                prepareEntry(cctx, nodeId, entry);
 
-                            ctx.continuous().addNotification(nodeId, routineId, evt.entry(), topic, sync, true);
+                                ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true);
+                            }
                         }
-                        catch (ClusterTopologyCheckedException ex) {
-                            IgniteLogger log = ctx.log(getClass());
+                        else
+                            backupQueue.add(entry);
+                    }
+                    catch (ClusterTopologyCheckedException ex) {
+                        IgniteLogger log = ctx.log(getClass());
 
-                            if (log.isDebugEnabled())
-                                log.debug("Failed to send event notification to node, node left cluster " +
-                                    "[node=" + nodeId + ", err=" + ex + ']');
-                        }
-                        catch (IgniteCheckedException ex) {
-                            U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex);
-                        }
+                        if (log.isDebugEnabled())
+                            log.debug("Failed to send event notification to node, node left cluster " +
+                                "[node=" + nodeId + ", err=" + ex + ']');
+                    }
+                    catch (IgniteCheckedException ex) {
+                        U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex);
                     }
 
                     if (recordIgniteEvt) {
@@ -283,6 +333,49 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
                     ((PlatformContinuousQueryFilter)rmtFilter).onQueryUnregister();
             }
 
+            @Override public void cleanupBackupQueue(Map<Integer, Long> updateIdxs) {
+                Iterator<CacheContinuousQueryEntry> it = backupQueue.iterator();
+
+                while (it.hasNext()) {
+                    CacheContinuousQueryEntry backupEntry = it.next();
+
+                    Long updateIdx = updateIdxs.get(backupEntry.partition());
+
+                    if (updateIdx != null && backupEntry.updateIndex() <= updateIdx)
+                        it.remove();
+                }
+            }
+
+            @Override public void flushBackupQueue(GridKernalContext ctx, AffinityTopologyVersion topVer) {
+                if (backupQueue.isEmpty())
+                    return;
+
+                try {
+                    GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+                    for (CacheContinuousQueryEntry e : backupQueue)
+                        prepareEntry(cctx, nodeId, e);
+
+                    ctx.continuous().addBackupNotification(nodeId, routineId, backupQueue, topic);
+
+                    backupQueue.clear();
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(ctx.log(getClass()), "Failed to send backup event notification to node: " + nodeId, e);
+                }
+            }
+
+            @Override public void acknowledgeBackupOnTimeout(GridKernalContext ctx) {
+                sendBackupAcknowledge(ackBuf.acknowledgeOnTimeout(), routineId, ctx);
+            }
+
+            @Override public void onPartitionEvicted(int part) {
+                for (Iterator<CacheContinuousQueryEntry> it = backupQueue.iterator(); it.hasNext();) {
+                    if (it.next().partition() == part)
+                        it.remove();
+                }
+            }
+
             @Override public boolean oldValueRequired() {
                 return oldValRequired;
             }
@@ -304,6 +397,23 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
         return mgr.registerListener(routineId, lsnr, internal);
     }
 
+    /**
+     * @param cctx Context.
+     * @param nodeId ID of the node that started routine.
+     * @param entry Entry.
+     * @throws IgniteCheckedException In case of error.
+     */
+    private void prepareEntry(GridCacheContext cctx, UUID nodeId, CacheContinuousQueryEntry entry)
+        throws IgniteCheckedException {
+        if (cctx.kernalContext().config().isPeerClassLoadingEnabled() && cctx.discovery().node(nodeId) != null) {
+            entry.prepareMarshal(cctx);
+
+            cctx.deploy().prepare(entry);
+        }
+        else
+            entry.prepareMarshal(cctx);
+    }
+
     /** {@inheritDoc} */
     @Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) {
         // No-op.
@@ -371,12 +481,40 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
                 @Override public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry e) {
                     return new CacheContinuousQueryEvent<>(cache, cctx, e);
                 }
-            }
+            },
+            dupEvtFilter
         );
 
         locLsnr.onUpdated(evts);
     }
 
+    /**
+     * @param e Entry.
+     * @return {@code True} if listener should be notified.
+     */
+    private boolean notifyListener(CacheContinuousQueryEntry e) {
+        Integer part = e.partition();
+
+        Long cntr = rcvCntrs.get(part);
+
+        if (cntr != null) {
+            long cntr0 = cntr;
+
+            if (e.updateIndex() > cntr0) {
+                // TODO IGNITE-426: remove assert.
+                assert e.updateIndex() == cntr0 + 1 : "Invalid entry [cntr=" + cntr + ", e=" + e + ']';
+
+                rcvCntrs.put(part, e.updateIndex());
+            }
+            else
+                return false;
+        }
+        else
+            rcvCntrs.put(part, e.updateIndex());
+
+        return true;
+    }
+
     /** {@inheritDoc} */
     @Override public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException {
         assert ctx != null;
@@ -397,6 +535,65 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
+    @Override public GridContinuousBatch createBatch() {
+        return new GridContinuousBatchAdapter();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onBatchAcknowledged(final UUID routineId,
+        GridContinuousBatch batch,
+        final GridKernalContext ctx) {
+        sendBackupAcknowledge(ackBuf.onAcknowledged(batch), routineId, ctx);
+    }
+
+    /**
+     * @param t Acknowledge information.
+     * @param routineId Routine ID.
+     * @param ctx Context.
+     */
+    private void sendBackupAcknowledge(final IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> t,
+        final UUID routineId,
+        final GridKernalContext ctx) {
+        if (t != null) {
+            ctx.closure().runLocalSafe(new Runnable() {
+                @Override public void run() {
+                    GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+                    CacheContinuousQueryBatchAck msg = new CacheContinuousQueryBatchAck(cctx.cacheId(),
+                        routineId,
+                        t.get1());
+
+                    Collection<ClusterNode> nodes = new HashSet<>();
+
+                    for (AffinityTopologyVersion topVer : t.get2())
+                        nodes.addAll(ctx.discovery().cacheNodes(topVer));
+
+                    for (ClusterNode node : nodes) {
+                        if (!node.id().equals(ctx.localNodeId())) {
+                            try {
+                                cctx.io().send(node, msg, GridIoPolicy.SYSTEM_POOL);
+                            }
+                            catch (ClusterTopologyCheckedException e) {
+                                IgniteLogger log = ctx.log(getClass());
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Failed to send acknowledge message, node left " +
+                                        "[msg=" + msg + ", node=" + node + ']');
+                            }
+                            catch (IgniteCheckedException e) {
+                                IgniteLogger log = ctx.log(getClass());
+
+                                U.error(log, "Failed to send acknowledge message " +
+                                    "[msg=" + msg + ", node=" + node + ']', e);
+                            }
+                        }
+                    }
+                }
+            });
+        }
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public Object orderedTopic() {
         return topic;
     }
@@ -471,6 +668,106 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
         return ctx.cache().<K, V>context().cacheContext(cacheId);
     }
 
+    /** */
+    private static class AcknowledgeBuffer {
+        /** */
+        private int size;
+
+        /** */
+        @GridToStringInclude
+        private Map<Integer, Long> updateIdxs = new HashMap<>();
+
+        /** */
+        @GridToStringInclude
+        private Set<AffinityTopologyVersion> topVers = U.newHashSet(1);
+
+        /**
+         * @param batch Batch.
+         * @return Non-null tuple if acknowledge should be sent to backups.
+         */
+        @SuppressWarnings("unchecked")
+        @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>
+        onAcknowledged(GridContinuousBatch batch) {
+            size += batch.size();
+
+            Collection<CacheContinuousQueryEntry> entries = (Collection)batch.collect();
+
+            for (CacheContinuousQueryEntry e : entries)
+                addEntry(e);
+
+            return size >= BACKUP_ACK_THRESHOLD ? acknowledgeData() : null;
+        }
+
+        /**
+         * @param e Entry.
+         * @return Non-null tuple if acknowledge should be sent to backups.
+         */
+        @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>
+        onAcknowledged(CacheContinuousQueryEntry e) {
+            size++;
+
+            addEntry(e);
+
+            return size >= BACKUP_ACK_THRESHOLD ? acknowledgeData() : null;
+        }
+
+        /**
+         * @param e Entry.
+         */
+        private void addEntry(CacheContinuousQueryEntry e) {
+            topVers.add(e.topologyVersion());
+
+            Long cntr0 = updateIdxs.get(e.partition());
+
+            if (cntr0 == null || e.updateIndex() > cntr0)
+                updateIdxs.put(e.partition(), e.updateIndex());
+        }
+
+        /**
+         * @return Non-null tuple if acknowledge should be sent to backups.
+         */
+        @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>
+            acknowledgeOnTimeout() {
+            return size > 0 ? acknowledgeData() : null;
+        }
+
+        /**
+         * @return Tuple with acknowledge information.
+         */
+        private IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> acknowledgeData() {
+            assert size > 0;
+
+            Map<Integer, Long> idxs = new HashMap<>(updateIdxs);
+
+            IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> res =
+                new IgniteBiTuple<>(idxs, topVers);
+
+            topVers = U.newHashSet(1);
+
+            size = 0;
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(AcknowledgeBuffer.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    private class DuplicateEventFilter implements IgnitePredicate<CacheContinuousQueryEntry> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(CacheContinuousQueryEntry e) {
+            return notifyListener(e);
+        }
+    }
+
     /**
      * Deployable object.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a57d71e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index a3c19a9..2f9e111 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -17,6 +17,11 @@
 
 package org.apache.ignite.internal.processors.cache.query.continuous;
 
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
+
+import java.util.*;
+
 /**
  * Continuous query listener.
  */
@@ -33,7 +38,9 @@ interface CacheContinuousQueryListener<K, V> {
      * @param primary Primary flag.
      * @param recordIgniteEvt Whether to record event.
      */
-    public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary, boolean recordIgniteEvt);
+    public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt,
+        boolean primary,
+        boolean recordIgniteEvt);
 
     /**
      * Listener unregistered callback.
@@ -41,6 +48,31 @@ interface CacheContinuousQueryListener<K, V> {
     public void onUnregister();
 
     /**
+     * Cleans backup queue.
+     *
+     * @param updateIdxs Update indexes map.
+     */
+    public void cleanupBackupQueue(Map<Integer, Long> updateIdxs);
+
+    /**
+     * Flushes backup queue.
+     *
+     * @param ctx Context.
+     * @param topVer Topology version.
+     */
+    public void flushBackupQueue(GridKernalContext ctx, AffinityTopologyVersion topVer);
+
+    /**
+     * @param ctx Context.
+     */
+    public void acknowledgeBackupOnTimeout(GridKernalContext ctx);
+
+    /**
+     * @param part Partition.
+     */
+    public void onPartitionEvicted(int part);
+
+    /**
      * @return Whether old value is required.
      */
     public boolean oldValueRequired();

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a57d71e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index c7bf091..f0e9c0b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -52,6 +52,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
+import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgnitePredicate;
@@ -82,6 +83,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     /** */
     private static final byte EXPIRED_FLAG = 0b1000;
 
+    /** */
+    private static final long BACKUP_ACK_FREQ = 5000;
+
     /** Listeners. */
     private final ConcurrentMap<UUID, CacheContinuousQueryListener> lsnrs = new ConcurrentHashMap8<>();
 
@@ -108,6 +112,26 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     @Override protected void start0() throws IgniteCheckedException {
         // Append cache name to the topic.
         topicPrefix = "CONTINUOUS_QUERY" + (cctx.name() == null ? "" : "_" + cctx.name());
+
+        cctx.io().addHandler(cctx.cacheId(), CacheContinuousQueryBatchAck.class,
+            new CI2<UUID, CacheContinuousQueryBatchAck>() {
+                @Override public void apply(UUID uuid, CacheContinuousQueryBatchAck msg) {
+                    CacheContinuousQueryListener lsnr = lsnrs.get(msg.routineId());
+
+                    if (lsnr != null)
+                        lsnr.cleanupBackupQueue(msg.updateIndexes());
+                }
+            });
+
+        cctx.time().schedule(new Runnable() {
+            @Override public void run() {
+                for (CacheContinuousQueryListener lsnr : lsnrs.values())
+                    lsnr.acknowledgeBackupOnTimeout(cctx.kernalContext());
+
+                for (CacheContinuousQueryListener lsnr : intLsnrs.values())
+                    lsnr.acknowledgeBackupOnTimeout(cctx.kernalContext());
+            }
+        }, BACKUP_ACK_FREQ, BACKUP_ACK_FREQ);
     }
 
     /** {@inheritDoc} */
@@ -141,18 +165,25 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
      * @param key Key.
      * @param newVal New value.
      * @param oldVal Old value.
+     * @param primary {@code True} if called on primary node.
      * @param preload Whether update happened during preloading.
+     * @param updateIdx Update index.
+     * @param topVer Topology version.
      * @throws IgniteCheckedException In case of error.
      */
     public void onEntryUpdated(GridCacheEntryEx e,
         KeyCacheObject key,
         CacheObject newVal,
         CacheObject oldVal,
-        boolean preload)
+        boolean primary,
+        boolean preload,
+        long updateIdx,
+        AffinityTopologyVersion topVer)
         throws IgniteCheckedException
     {
         assert e != null;
         assert key != null;
+        assert Thread.holdsLock(e) : e;
 
         boolean internal = e.isInternal() || !e.context().userCache();
 
@@ -179,8 +210,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
 
         boolean initialized = false;
 
-        boolean primary = cctx.affinity().primary(cctx.localNode(), key, AffinityTopologyVersion.NONE);
-        boolean recordIgniteEvt = !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
+        boolean recordIgniteEvt = primary && !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
 
         for (CacheContinuousQueryListener lsnr : lsnrCol.values()) {
             if (preload && !lsnr.notifyExisting())
@@ -205,7 +235,10 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                 evtType,
                 key,
                 newVal,
-                lsnr.oldValueRequired() ? oldVal : null);
+                lsnr.oldValueRequired() ? oldVal : null,
+                e.partition(),
+                updateIdx,
+                topVer);
 
             CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
                 cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
@@ -224,6 +257,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         throws IgniteCheckedException {
         assert e != null;
         assert key != null;
+        assert Thread.holdsLock(e) : e;
 
         if (e.isInternal())
             return;
@@ -255,7 +289,10 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                    EXPIRED,
                    key,
                    null,
-                   lsnr.oldValueRequired() ? oldVal : null);
+                   lsnr.oldValueRequired() ? oldVal : null,
+                   e.partition(),
+                   0,
+                   null);
 
                 CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent(
                     cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
@@ -373,6 +410,30 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     }
 
     /**
+     * @param topVer Topology version.
+     */
+    public void beforeExchange(AffinityTopologyVersion topVer) {
+        for (CacheContinuousQueryListener lsnr : lsnrs.values())
+            lsnr.flushBackupQueue(cctx.kernalContext(), topVer);
+
+        for (CacheContinuousQueryListener lsnr : intLsnrs.values())
+            lsnr.flushBackupQueue(cctx.kernalContext(), topVer);
+    }
+
+    /**
+     * Partition evicted callback.
+     *
+     * @param part Partition number.
+     */
+    public void onPartitionEvicted(int part) {
+        for (CacheContinuousQueryListener lsnr : lsnrs.values())
+            lsnr.onPartitionEvicted(part);
+
+        for (CacheContinuousQueryListener lsnr : intLsnrs.values())
+            lsnr.onPartitionEvicted(part);
+    }
+
+    /**
      * @param locLsnr Local listener.
      * @param rmtFilter Remote filter.
      * @param bufSize Buffer size.
@@ -417,7 +478,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
             sync,
             ignoreExpired,
             taskNameHash,
-            skipPrimaryCheck);
+            skipPrimaryCheck,
+            cctx.isLocal());
 
         IgnitePredicate<ClusterNode> pred = (loc || cctx.config().getCacheMode() == CacheMode.LOCAL) ?
             F.nodeForNodeId(cctx.localNodeId()) : F.<ClusterNode>alwaysTrue();
@@ -469,10 +531,19 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
 
                                 GridCacheEntryEx e = it.next();
 
+                                CacheContinuousQueryEntry entry = new CacheContinuousQueryEntry(
+                                    cctx.cacheId(),
+                                    CREATED,
+                                    e.key(),
+                                    e.rawGet(),
+                                    null,
+                                    0,
+                                    0,
+                                    null);
+
                                 next = new CacheContinuousQueryEvent<>(
                                     cctx.kernalContext().cache().jcache(cctx.name()),
-                                    cctx,
-                                    new CacheContinuousQueryEntry(cctx.cacheId(), CREATED, e.key(), e.rawGet(), null));
+                                    cctx, entry);
 
                                 if (rmtFilter != null && !rmtFilter.evaluate(next))
                                     next = null;
@@ -637,6 +708,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
 
         /**
          * @param impl Listener.
+         * @param log Logger.
          */
         JCacheQueryLocalListener(CacheEntryListener<K, V> impl, IgniteLogger log) {
             assert impl != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a57d71e/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java
new file mode 100644
index 0000000..2fef161
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java
@@ -0,0 +1,7 @@
+package org.apache.ignite.internal.processors.continuous;
+
+/**
+ * Created by Nikolay on 02.09.2015.
+ */
+public interface GridContinuousBatch {
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a57d71e/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java
new file mode 100644
index 0000000..8e29e29
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java
@@ -0,0 +1,7 @@
+package org.apache.ignite.internal.processors.continuous;
+
+/**
+ * Created by Nikolay on 02.09.2015.
+ */
+public class GridContinuousBatchAdapter {
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a57d71e/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index d1cb3a9..15c9dd2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -651,6 +651,30 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     /**
      * @param nodeId ID of the node that started routine.
      * @param routineId Routine ID.
+     * @param objs Notification objects.
+     * @param orderedTopic Topic for ordered notifications. If {@code null}, non-ordered message will be sent.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public void addBackupNotification(UUID nodeId,
+        final UUID routineId,
+        Collection<?> objs,
+        @Nullable Object orderedTopic)
+        throws IgniteCheckedException {
+        if (processorStopped)
+            return;
+
+        final RemoteRoutineInfo info = rmtInfos.get(routineId);
+
+        if (info != null) {
+            final GridContinuousBatch batch = info.addAll(objs);
+
+            sendNotification(nodeId, routineId, null, batch.collect(), orderedTopic, true, null);
+        }
+    }
+
+    /**
+     * @param nodeId ID of the node that started routine.
+     * @param routineId Routine ID.
      * @param obj Notification object.
      * @param orderedTopic Topic for ordered notifications. If {@code null}, non-ordered message will be sent.
      * @param sync If {@code true} then waits for event acknowledgment.
@@ -658,8 +682,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
      * @throws IgniteCheckedException In case of error.
      */
     public void addNotification(UUID nodeId,
-        UUID routineId,
-        @Nullable Object obj,
+        final UUID routineId,
+        Object obj,
         @Nullable Object orderedTopic,
         boolean sync,
         boolean msg)
@@ -673,7 +697,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         if (processorStopped)
             return;
 
-        RemoteRoutineInfo info = rmtInfos.get(routineId);
+        final RemoteRoutineInfo info = rmtInfos.get(routineId);
 
         if (info != null) {
             assert info.interval == 0 || !sync;
@@ -686,7 +710,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 syncMsgFuts.put(futId, fut);
 
                 try {
-                    sendNotification(nodeId, routineId, futId, F.asList(obj), orderedTopic, msg);
+                    sendNotification(nodeId, routineId, futId, F.asList(obj), orderedTopic, msg, null);
                 }
                 catch (IgniteCheckedException e) {
                     syncMsgFuts.remove(futId);
@@ -697,10 +721,18 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 fut.get();
             }
             else {
-                Collection<Object> toSnd = info.add(obj);
+                final GridContinuousBatch batch = info.add(obj);
+
+                if (batch != null) {
+                    CI1<IgniteException> ackC = new CI1<IgniteException>() {
+                        @Override public void apply(IgniteException e) {
+                            if (e == null)
+                                info.hnd.onBatchAcknowledged(routineId, batch, ctx);
+                        }
+                    };
 
-                if (toSnd != null)
-                    sendNotification(nodeId, routineId, null, toSnd, orderedTopic, msg);
+                    sendNotification(nodeId, routineId, null, batch.collect(), orderedTopic, msg, ackC);
+                }
             }
         }
     }
@@ -725,6 +757,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
      * @param orderedTopic Topic for ordered notifications.
      *      If {@code null}, non-ordered message will be sent.
      * @param msg If {@code true} then sent data is collection of messages.
+     * @param ackC Ack closure.
      * @throws IgniteCheckedException In case of error.
      */
     private void sendNotification(UUID nodeId,
@@ -732,7 +765,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         @Nullable IgniteUuid futId,
         Collection<Object> toSnd,
         @Nullable Object orderedTopic,
-        boolean msg) throws IgniteCheckedException {
+        boolean msg,
+        IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
         assert nodeId != null;
         assert routineId != null;
         assert toSnd != null;
@@ -740,7 +774,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
         sendWithRetries(nodeId,
             new GridContinuousMessage(MSG_EVT_NOTIFICATION, routineId, futId, toSnd, msg),
-            orderedTopic);
+            orderedTopic,
+            ackC);
     }
 
     /**
@@ -859,6 +894,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 try {
                     sendWithRetries(nodeId,
                         new GridContinuousMessage(MSG_EVT_ACK, null, msg.futureId(), null, false),
+                        null,
                         null);
                 }
                 catch (IgniteCheckedException e) {
@@ -922,15 +958,30 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                                 break;
                             }
 
-                            IgniteBiTuple<Collection<Object>, Long> t = info.checkInterval();
+                            IgniteBiTuple<GridContinuousBatch, Long> t = info.checkInterval();
 
-                            Collection<Object> toSnd = t.get1();
+                            final GridContinuousBatch batch = t.get1();
 
-                            if (toSnd != null && !toSnd.isEmpty()) {
+                            if (batch != null && batch.size() > 0) {
                                 try {
+                                    Collection<Object> toSnd = batch.collect();
+
                                     boolean msg = toSnd.iterator().next() instanceof Message;
 
-                                    sendNotification(nodeId, routineId, null, toSnd, hnd.orderedTopic(), msg);
+                                    CI1<IgniteException> ackC = new CI1<IgniteException>() {
+                                        @Override public void apply(IgniteException e) {
+                                            if (e == null)
+                                                info.hnd.onBatchAcknowledged(routineId, batch, ctx);
+                                        }
+                                    };
+
+                                    sendNotification(nodeId,
+                                        routineId,
+                                        null,
+                                        toSnd,
+                                        hnd.orderedTopic(),
+                                        msg,
+                                        ackC);
                                 }
                                 catch (ClusterTopologyCheckedException ignored) {
                                     if (log.isDebugEnabled())
@@ -1013,9 +1064,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
      * @param msg Message.
      * @param orderedTopic Topic for ordered notifications.
      *      If {@code null}, non-ordered message will be sent.
+     * @param ackC Ack closure.
      * @throws IgniteCheckedException In case of error.
      */
-    private void sendWithRetries(UUID nodeId, GridContinuousMessage msg, @Nullable Object orderedTopic)
+    private void sendWithRetries(UUID nodeId, GridContinuousMessage msg, @Nullable Object orderedTopic,
+        IgniteInClosure<IgniteException> ackC)
         throws IgniteCheckedException {
         assert nodeId != null;
         assert msg != null;
@@ -1023,7 +1076,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         ClusterNode node = ctx.discovery().node(nodeId);
 
         if (node != null)
-            sendWithRetries(node, msg, orderedTopic);
+            sendWithRetries(node, msg, orderedTopic, ackC);
         else
             throw new ClusterTopologyCheckedException("Node for provided ID doesn't exist (did it leave the grid?): " + nodeId);
     }
@@ -1033,14 +1086,15 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
      * @param msg Message.
      * @param orderedTopic Topic for ordered notifications.
      *      If {@code null}, non-ordered message will be sent.
+     * @param ackC Ack closure.
      * @throws IgniteCheckedException In case of error.
      */
-    private void sendWithRetries(ClusterNode node, GridContinuousMessage msg, @Nullable Object orderedTopic)
-        throws IgniteCheckedException {
+    private void sendWithRetries(ClusterNode node, GridContinuousMessage msg, @Nullable Object orderedTopic,
+        IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
         assert node != null;
         assert msg != null;
 
-        sendWithRetries(F.asList(node), msg, orderedTopic);
+        sendWithRetries(F.asList(node), msg, orderedTopic, ackC);
     }
 
     /**
@@ -1048,10 +1102,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
      * @param msg Message.
      * @param orderedTopic Topic for ordered notifications.
      *      If {@code null}, non-ordered message will be sent.
+     * @param ackC Ack closure.
      * @throws IgniteCheckedException In case of error.
      */
     private void sendWithRetries(Collection<? extends ClusterNode> nodes, GridContinuousMessage msg,
-        @Nullable Object orderedTopic) throws IgniteCheckedException {
+        @Nullable Object orderedTopic, IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
         assert !F.isEmpty(nodes);
         assert msg != null;
 
@@ -1074,10 +1129,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                             msg,
                             SYSTEM_POOL,
                             0,
-                            true);
+                            true,
+                            ackC);
                     }
                     else
-                        ctx.io().send(node, TOPIC_CONTINUOUS, msg, SYSTEM_POOL);
+                        ctx.io().send(node, TOPIC_CONTINUOUS, msg, SYSTEM_POOL, ackC);
 
                     break;
                 }
@@ -1178,8 +1234,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         /** Lock. */
         private final ReadWriteLock lock = new ReentrantReadWriteLock();
 
-        /** Buffer. */
-        private ConcurrentLinkedDeque8<Object> buf;
+        /** Batch. */
+        private GridContinuousBatch batch;
 
         /** Last send time. */
         private long lastSndTime = U.currentTimeMillis();
@@ -1210,7 +1266,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             this.interval = interval;
             this.autoUnsubscribe = autoUnsubscribe;
 
-            buf = new ConcurrentLinkedDeque8<>();
+            batch = hnd.createBatch();
         }
 
         /**
@@ -1238,21 +1294,53 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         }
 
         /**
+         * @param objs Objects to add.
+         * @return Batch to send.
+         */
+        GridContinuousBatch addAll(Collection<?> objs) {
+            assert objs != null;
+            assert objs.size() > 0;
+
+            GridContinuousBatch toSnd = null;
+
+            lock.writeLock().lock();
+
+            try {
+                for (Object obj : objs)
+                    batch.add(obj);
+
+                toSnd = batch;
+
+                batch = hnd.createBatch();
+
+                if (interval > 0)
+                    lastSndTime = U.currentTimeMillis();
+            }
+            finally {
+                lock.writeLock().unlock();
+            }
+
+            return toSnd;
+        }
+
+        /**
          * @param obj Object to add.
-         * @return Object to send or {@code null} if there is nothing to send for now.
+         * @return Batch to send or {@code null} if there is nothing to send for now.
          */
-        @Nullable Collection<Object> add(@Nullable Object obj) {
-            ConcurrentLinkedDeque8 buf0 = null;
+        @Nullable GridContinuousBatch add(Object obj) {
+            assert obj != null;
 
-            if (buf.sizex() >= bufSize - 1) {
+            GridContinuousBatch toSnd = null;
+
+            if (batch.size() >= bufSize - 1) {
                 lock.writeLock().lock();
 
                 try {
-                    buf.add(obj);
+                    batch.add(obj);
 
-                    buf0 = buf;
+                    toSnd = batch;
 
-                    buf = new ConcurrentLinkedDeque8<>();
+                    batch = hnd.createBatch();
 
                     if (interval > 0)
                         lastSndTime = U.currentTimeMillis();
@@ -1265,34 +1353,25 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 lock.readLock().lock();
 
                 try {
-                    buf.add(obj);
+                    batch.add(obj);
                 }
                 finally {
                     lock.readLock().unlock();
                 }
             }
 
-            Collection<Object> toSnd = null;
-
-            if (buf0 != null) {
-                toSnd = new ArrayList<>(buf0.sizex());
-
-                for (Object o : buf0)
-                    toSnd.add(o);
-            }
-
             return toSnd;
         }
 
         /**
-         * @return Tuple with objects to sleep (or {@code null} if there is nothing to
+         * @return Tuple with batch to send (or {@code null} if there is nothing to
          *      send for now) and time interval after next check is needed.
          */
         @SuppressWarnings("TooBroadScope")
-        IgniteBiTuple<Collection<Object>, Long> checkInterval() {
+        IgniteBiTuple<GridContinuousBatch, Long> checkInterval() {
             assert interval > 0;
 
-            Collection<Object> toSnd = null;
+            GridContinuousBatch toSnd = null;
             long diff;
 
             long now = U.currentTimeMillis();
@@ -1302,10 +1381,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             try {
                 diff = now - lastSndTime;
 
-                if (diff >= interval && !buf.isEmpty()) {
-                    toSnd = buf;
+                if (diff >= interval && batch.size() > 0) {
+                    toSnd = batch;
 
-                    buf = new ConcurrentLinkedDeque8<>();
+                    batch = hnd.createBatch();
 
                     lastSndTime = now;
                 }


Mime
View raw message