ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: cc
Date Thu, 25 May 2017 08:30:26 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-5075-cc 79e34c2cf -> 7bf63c0ee


cc


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7bf63c0e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7bf63c0e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7bf63c0e

Branch: refs/heads/ignite-5075-cc
Commit: 7bf63c0ee7d41317dc558d40267e40d9ed65fda3
Parents: 79e34c2
Author: sboikov <sboikov@gridgain.com>
Authored: Thu May 25 11:30:17 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu May 25 11:30:17 2017 +0300

----------------------------------------------------------------------
 .../CacheContinuousQueryAcknowledgeBuffer.java  | 120 ++++
 .../CacheContinuousQueryDeployableObject.java   | 110 ++++
 .../CacheContinuousQueryEventBuffer.java        |  91 ++-
 .../continuous/CacheContinuousQueryHandler.java | 602 +++----------------
 .../CacheContinuousQueryHandlerV2.java          |   6 +-
 .../CacheContinuousQueryPartitionRecovery.java  | 252 ++++++++
 .../continuous/GridContinuousProcessor.java     |   7 +-
 .../CacheContinuousQueryEventBufferTest.java    |  65 +-
 8 files changed, 682 insertions(+), 571 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7bf63c0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAcknowledgeBuffer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAcknowledgeBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAcknowledgeBuffer.java
new file mode 100644
index 0000000..c95dc42
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAcknowledgeBuffer.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
+import org.apache.ignite.internal.processors.continuous.GridContinuousQueryBatch;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+/** */
+class CacheContinuousQueryAcknowledgeBuffer {
+    /** */
+    private int size;
+
+    /** */
+    @GridToStringInclude
+    private Map<Integer, Long> updateCntrs = new HashMap<>();
+
+    /** */
+    @GridToStringInclude
+    private Set<AffinityTopologyVersion> topVers = U.newHashSet(1);
+
+    /**
+     * @param batch Batch.
+     * @return Non-null tuple if acknowledge should be sent to backups.
+     */
+    @SuppressWarnings("unchecked")
+    @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>onAcknowledged(
+        GridContinuousBatch batch) {
+        assert batch instanceof GridContinuousQueryBatch;
+
+        size += ((GridContinuousQueryBatch)batch).entriesCount();
+
+        Collection<CacheContinuousQueryEntry> entries = (Collection)batch.collect();
+
+        for (CacheContinuousQueryEntry e : entries)
+            addEntry(e);
+
+        return size >= CacheContinuousQueryHandler.BACKUP_ACK_THRESHOLD ? acknowledgeData() : null;
+    }
+
+    /**
+     * @param e Entry.
+     * @return Non-null tuple if acknowledge should be sent to backups.
+     */
+    @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>
+    onAcknowledged(CacheContinuousQueryEntry e) {
+        size++;
+
+        addEntry(e);
+
+        return size >= CacheContinuousQueryHandler.BACKUP_ACK_THRESHOLD ? acknowledgeData() : null;
+    }
+
+    /**
+     * @param e Entry.
+     */
+    private void addEntry(CacheContinuousQueryEntry e) {
+        topVers.add(e.topologyVersion());
+
+        Long cntr0 = updateCntrs.get(e.partition());
+
+        if (cntr0 == null || e.updateCounter() > cntr0)
+            updateCntrs.put(e.partition(), e.updateCounter());
+    }
+
+    /**
+     * @return Non-null tuple if acknowledge should be sent to backups.
+     */
+    @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>
+        acknowledgeOnTimeout() {
+        return size > 0 ? acknowledgeData() : null;
+    }
+
+    /**
+     * @return Tuple with acknowledge information.
+     */
+    private IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> acknowledgeData() {
+        assert size > 0;
+
+        Map<Integer, Long> cntrs = new HashMap<>(updateCntrs);
+
+        IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> res =
+            new IgniteBiTuple<>(cntrs, topVers);
+
+        topVers = U.newHashSet(1);
+
+        size = 0;
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CacheContinuousQueryAcknowledgeBuffer.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7bf63c0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeployableObject.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeployableObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeployableObject.java
new file mode 100644
index 0000000..f888467
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeployableObject.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteDeploymentCheckedException;
+import org.apache.ignite.internal.managers.deployment.GridDeployment;
+import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
+import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Deployable object.
+ */
+class CacheContinuousQueryDeployableObject implements Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Serialized object. */
+    private byte[] bytes;
+
+    /** Deployment class name. */
+    private String clsName;
+
+    /** Deployment info. */
+    private GridDeploymentInfo depInfo;
+
+    /**
+     * Required by {@link Externalizable}.
+     */
+    public CacheContinuousQueryDeployableObject() {
+        // No-op.
+    }
+
+    /**
+     * @param obj Object.
+     * @param ctx Kernal context.
+     * @throws IgniteCheckedException In case of error.
+     */
+    protected CacheContinuousQueryDeployableObject(Object obj, GridKernalContext ctx) throws IgniteCheckedException {
+        assert obj != null;
+        assert ctx != null;
+
+        Class cls = U.detectClass(obj);
+
+        clsName = cls.getName();
+
+        GridDeployment dep = ctx.deploy().deploy(cls, U.detectClassLoader(cls));
+
+        if (dep == null)
+            throw new IgniteDeploymentCheckedException("Failed to deploy object: " + obj);
+
+        depInfo = new GridDeploymentInfoBean(dep);
+
+        bytes = U.marshal(ctx, obj);
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param ctx Kernal context.
+     * @return Deserialized object.
+     * @throws IgniteCheckedException In case of error.
+     */
+    <T> T unmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException {
+        assert ctx != null;
+
+        GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName,
+            depInfo.userVersion(), nodeId, depInfo.classLoaderId(), depInfo.participants(), null);
+
+        if (dep == null)
+            throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName);
+
+        return U.unmarshal(ctx, bytes, U.resolveClassLoader(dep.classLoader(), ctx.config()));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        U.writeByteArray(out, bytes);
+        U.writeString(out, clsName);
+        out.writeObject(depInfo);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        bytes = U.readByteArray(in);
+        clsName = U.readString(in);
+        depInfo = (GridDeploymentInfo)in.readObject();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7bf63c0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
index a308e39..949ea67 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
@@ -19,8 +19,11 @@ package org.apache.ignite.internal.processors.cache.query.continuous;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteSystemProperties;
@@ -36,10 +39,52 @@ public class CacheContinuousQueryEventBuffer {
         IgniteSystemProperties.getInteger("IGNITE_CONTINUOUS_QUERY_SERVER_BUFFER_SIZE", 5);
 
     /** */
+    protected final int part;
+
+    /**
+     * @param part Partition number.
+     */
+    CacheContinuousQueryEventBuffer(int part) {
+        this.part = part;
+    }
+
+    /** */
     private AtomicReference<Batch> curBatch = new AtomicReference<>();
 
     /** */
-    private ConcurrentSkipListMap<Long, Object> pending = new ConcurrentSkipListMap<>();
+    private ConcurrentLinkedDeque<CacheContinuousQueryEntry> backupQ = new ConcurrentLinkedDeque<>();
+
+    /** */
+    private ConcurrentSkipListMap<Long, CacheContinuousQueryEntry> pending = new ConcurrentSkipListMap<>();
+
+    /**
+     * @param updateCntr Acknowledged counter.
+     */
+    void cleanupBackupQueue(Long updateCntr) {
+        Iterator<CacheContinuousQueryEntry> it = backupQ.iterator();
+
+        while (it.hasNext()) {
+            CacheContinuousQueryEntry backupEntry = it.next();
+
+            if (backupEntry.updateCounter() <= updateCntr)
+                it.remove();
+        }
+    }
+
+    /**
+     * @return Backup entries.
+     */
+    @Nullable Collection<CacheContinuousQueryEntry> resetBackupQueue() {
+        if (!backupQ.isEmpty()) {
+            ConcurrentLinkedDeque<CacheContinuousQueryEntry> ret = this.backupQ;
+
+            backupQ = new ConcurrentLinkedDeque<>();
+
+            return ret;
+        }
+
+        return null;
+    }
 
     /**
      * @return Initial partition counter.
@@ -61,26 +106,20 @@ public class CacheContinuousQueryEventBuffer {
 
     /**
      * @param e Entry to process.
+     * @param backup Backup entry flag.
      * @return Collected entries to pass to listener (single entry or entries list).
      */
-    @Nullable Object processEntry(CacheContinuousQueryEntry e) {
-        return process0(e.updateCounter(), e);
-    }
-
-    /**
-     * @param cntr Filtered counter.
-     * @return Collected entries to pass to listener (single entry or entries list).
-     */
-    @Nullable Object processFiltered(long cntr) {
-        return process0(cntr, cntr);
+    @Nullable Object processEntry(CacheContinuousQueryEntry e, boolean backup) {
+        return process0(e.updateCounter(), e, backup);
     }
 
     /**
+     * @param backup Backup entry flag.
      * @param cntr Entry counter.
      * @param entry Entry.
      * @return Collected entries.
      */
-    private Object process0(long cntr, Object entry) {
+    private Object process0(long cntr, CacheContinuousQueryEntry entry, boolean backup) {
         assert cntr >= 0 : cntr;
 
         Batch batch = initBatch();
@@ -88,13 +127,16 @@ public class CacheContinuousQueryEventBuffer {
         if (batch == null || cntr < batch.startCntr) {
             assert entry != null : cntr;
 
+            if (backup)
+                backupQ.add(entry);
+
             return entry;
         }
 
         Object res = null;
 
         if (cntr <= batch.endCntr)
-            res = batch.processEvent0(null, cntr, entry);
+            res = batch.processEvent0(null, cntr, entry, backup);
         else
             pending.put(cntr, entry);
 
@@ -104,7 +146,7 @@ public class CacheContinuousQueryEventBuffer {
             do {
                 batch = batch0;
 
-                res = processPending(res, batch);
+                res = processPending(res, batch, backup);
 
                 batch0 = curBatch.get();
             }
@@ -139,17 +181,18 @@ public class CacheContinuousQueryEventBuffer {
     /**
      * @param res Current result.
      * @param batch Current batch.
+     * @param backup Backup entry flag.
      * @return New result.
      */
-    @Nullable private Object processPending(@Nullable Object res, Batch batch) {
+    @Nullable private Object processPending(@Nullable Object res, Batch batch, boolean backup) {
         if (pending.floorKey(batch.endCntr) != null) {
-            for (Map.Entry<Long, Object> p : pending.headMap(batch.endCntr, true).entrySet()) {
+            for (Map.Entry<Long, CacheContinuousQueryEntry> p : pending.headMap(batch.endCntr, true).entrySet()) {
                 long cntr = p.getKey();
 
                 assert cntr >= batch.startCntr && cntr <= batch.endCntr : cntr;
 
                 if (pending.remove(p.getKey()) != null)
-                    res = batch.processEvent0(res, p.getKey(), p.getValue());
+                    res = batch.processEvent0(res, p.getKey(), p.getValue(), backup);
             }
         }
 
@@ -195,13 +238,15 @@ public class CacheContinuousQueryEventBuffer {
          * @param res Current result.
          * @param cntr Event counter.
          * @param evt Event.
+         * @param backup Backup entry flag.
          * @return New result.
          */
         @SuppressWarnings("unchecked")
         @Nullable private Object processEvent0(
             @Nullable Object res,
             long cntr,
-            Object evt) {
+            CacheContinuousQueryEntry evt,
+            boolean backup) {
             int pos = (int)(cntr - startCntr);
 
             synchronized (this) {
@@ -224,9 +269,15 @@ public class CacheContinuousQueryEventBuffer {
 
                                     filtered = 0;
 
-                                    if (res == null)
-                                        res = evt0;
+                                    if (res == null) {
+                                        if (backup)
+                                            backupQ.add(evt0);
+                                        else
+                                            res = evt0;
+                                    }
                                     else {
+                                        assert !backup;
+
                                         List<CacheContinuousQueryEntry> resList;
 
                                         if (res instanceof CacheContinuousQueryEntry) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7bf63c0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index f7547f5..540f871 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -24,12 +24,9 @@ import java.io.ObjectOutput;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -46,13 +43,10 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.CacheQueryExecutedEvent;
 import org.apache.ignite.events.CacheQueryReadEvent;
 import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.IgniteDeploymentCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
-import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
-import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
@@ -67,12 +61,10 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
 import org.apache.ignite.internal.processors.continuous.GridContinuousQueryBatch;
 import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteAsyncCallback;
@@ -80,7 +72,6 @@ import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentLinkedDeque8;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
@@ -93,11 +84,11 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     private static final long serialVersionUID = 0L;
 
     /** */
-    private static final int BACKUP_ACK_THRESHOLD =
+    static final int BACKUP_ACK_THRESHOLD =
         IgniteSystemProperties.getInteger("IGNITE_CONTINUOUS_QUERY_BACKUP_ACK_THRESHOLD", 100);
 
     /** */
-    private static final int LSNR_MAX_BUF_SIZE =
+    static final int LSNR_MAX_BUF_SIZE =
         IgniteSystemProperties.getInteger("IGNITE_CONTINUOUS_QUERY_LISTENER_MAX_BUFFER_SIZE", 10_000);
 
     /** Cache name. */
@@ -113,7 +104,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     private CacheEntryEventSerializableFilter<K, V> rmtFilter;
 
     /** Deployable object for filter. */
-    private DeployableObject rmtFilterDep;
+    private CacheContinuousQueryDeployableObject rmtFilterDep;
 
     /** Internal flag. */
     private boolean internal;
@@ -136,9 +127,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     /** Whether to skip primary check for REPLICATED cache. */
     private transient boolean skipPrimaryCheck;
 
-    /** Backup queue. */
-    private transient volatile Collection<CacheContinuousQueryEntry> backupQueue;
-
     /** */
     private boolean locCache;
 
@@ -146,13 +134,13 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     private boolean keepBinary;
 
     /** */
-    private transient ConcurrentMap<Integer, PartitionRecovery> rcvs;
+    private transient ConcurrentMap<Integer, CacheContinuousQueryPartitionRecovery> rcvs;
 
     /** */
     private transient ConcurrentMap<Integer, CacheContinuousQueryEventBuffer> entryBufs;
 
     /** */
-    private transient AcknowledgeBuffer ackBuf;
+    private transient CacheContinuousQueryAcknowledgeBuffer ackBuf;
 
     /** */
     private transient int cacheId;
@@ -167,6 +155,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     private transient volatile AffinityTopologyVersion initTopVer;
 
     /** */
+    private transient volatile boolean nodeLeft;
+
+    /** */
     private transient boolean ignoreClsNotFound;
 
     /** */
@@ -341,9 +332,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
         entryBufs = new ConcurrentHashMap<>();
 
-        backupQueue = new ConcurrentLinkedDeque8<>();
-
-        ackBuf = new AcknowledgeBuffer();
+        ackBuf = new CacheContinuousQueryAcknowledgeBuffer();
 
         rcvs = new ConcurrentHashMap<>();
 
@@ -413,7 +402,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                     ctx.asyncCallbackPool().execute(clsr, evt.partitionId());
                 }
                 else {
-                    final boolean notify = filter(evt, primary);
+                    final boolean notify = filter(evt);
 
                     if (log.isDebugEnabled())
                         log.debug("Filter invoked for event [evt=" + evt + ", primary=" + primary
@@ -433,6 +422,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                             }, sync);
                         }
                     }
+                    else
+                        handleBackupEntry(cctx, evt.entry());
                 }
             }
 
@@ -442,50 +433,38 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
             }
 
             @Override public void cleanupBackupQueue(Map<Integer, Long> updateCntrs) {
-                Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue;
-
-                if (backupQueue0 != null) {
-                    Iterator<CacheContinuousQueryEntry> it = backupQueue0.iterator();
-
-                    while (it.hasNext()) {
-                        CacheContinuousQueryEntry backupEntry = it.next();
-
-                        Long updateCntr = updateCntrs.get(backupEntry.partition());
+                for (Map.Entry<Integer, Long> e : updateCntrs.entrySet()) {
+                    CacheContinuousQueryEventBuffer buf = entryBufs.get(e.getKey());
 
-                        if (updateCntr != null && backupEntry.updateCounter() <= updateCntr)
-                            it.remove();
-                    }
+                    if (buf != null)
+                        buf.cleanupBackupQueue(e.getValue());
                 }
             }
 
             @Override public void flushBackupQueue(GridKernalContext ctx, AffinityTopologyVersion topVer) {
                 assert topVer != null;
 
-                Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue;
+                try {
+                    GridCacheContext<K, V> cctx = cacheContext(ctx);
 
-                if (backupQueue0 == null)
-                    return;
+                    ClusterNode node = ctx.discovery().node(nodeId);
 
-                try {
-                    ClusterNode nodeId0 = ctx.discovery().node(nodeId);
+                    for (Map.Entry<Integer, CacheContinuousQueryEventBuffer> bufE : entryBufs.entrySet()) {
+                        CacheContinuousQueryEventBuffer buf = bufE.getValue();
 
-                    if (nodeId0 != null) {
-                        GridCacheContext<K, V> cctx = cacheContext(ctx);
+                        Collection<CacheContinuousQueryEntry> backupQueue = buf.resetBackupQueue();
 
-                        for (CacheContinuousQueryEntry e : backupQueue0) {
-                            if (!e.isFiltered())
-                                prepareEntry(cctx, nodeId, e);
+                        if (backupQueue != null && node != null) {
+                            for (CacheContinuousQueryEntry e : backupQueue) {
+                                if (!e.isFiltered())
+                                    prepareEntry(cctx, nodeId, e);
 
-                            e.topologyVersion(topVer);
-                        }
+                                e.topologyVersion(topVer);
+                            }
 
-                        ctx.continuous().addBackupNotification(nodeId, routineId, backupQueue0, topic);
+                            ctx.continuous().addBackupNotification(nodeId, routineId, backupQueue, topic);
+                        }
                     }
-                    else
-                        // Node which start CQ leave topology. Not needed to put data to backup queue.
-                        backupQueue = null;
-
-                    backupQueue0.clear();
                 }
                 catch (IgniteCheckedException e) {
                     U.error(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY),
@@ -509,14 +488,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
             }
 
             @Override public void onPartitionEvicted(int part) {
-                Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue;
-
-                if (backupQueue0 != null) {
-                    for (Iterator<CacheContinuousQueryEntry> it = backupQueue0.iterator(); it.hasNext(); ) {
-                        if (it.next().partition() == part)
-                            it.remove();
-                    }
-                }
+                entryBufs.remove(part);
             }
 
             @Override public boolean oldValueRequired() {
@@ -743,17 +715,16 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 Collections.<CacheEntryEvent<? extends K, ? extends V>>emptyList();
         }
 
-        PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition(), e.topologyVersion());
+        CacheContinuousQueryPartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition(), e.topologyVersion());
 
         return rec.collectEntries(e, cctx, cache);
     }
 
     /**
-     * @param primary Primary.
      * @param evt Query event.
      * @return {@code True} if event passed filter otherwise {@code true}.
      */
-    public boolean filter(CacheContinuousQueryEvent evt, boolean primary) {
+    public boolean filter(CacheContinuousQueryEvent evt) {
         CacheContinuousQueryEntry entry = evt.entry();
 
         boolean notify = !entry.isFiltered();
@@ -769,15 +740,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         if (!notify)
             entry.markFiltered();
 
-        if (!primary && !internal && entry.updateCounter() != -1L /* Skip init query and expire entries */) {
-            entry.markBackup();
-
-            Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue;
-
-            if (backupQueue0 != null)
-                backupQueue0.add(entry.forBackupQueue());
-        }
-
         return notify;
     }
 
@@ -869,7 +831,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         if (internal)
             return;
 
-        for (PartitionRecovery rec : rcvs.values())
+        for (CacheContinuousQueryPartitionRecovery rec : rcvs.values())
             rec.resetTopologyCache();
     }
 
@@ -879,12 +841,12 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
      * @param topVer Topology version for current operation.
      * @return Partition recovery.
      */
-    @NotNull private PartitionRecovery getOrCreatePartitionRecovery(GridKernalContext ctx,
+    @NotNull private CacheContinuousQueryPartitionRecovery getOrCreatePartitionRecovery(GridKernalContext ctx,
         int partId,
         AffinityTopologyVersion topVer) {
         assert topVer != null && topVer.topologyVersion() > 0 : topVer;
 
-        PartitionRecovery rec = rcvs.get(partId);
+        CacheContinuousQueryPartitionRecovery rec = rcvs.get(partId);
 
         if (rec == null) {
             T2<Long, Long> partCntrs = null;
@@ -909,10 +871,10 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
             else if (initUpdCntrs != null)
                 partCntrs = initUpdCntrs.get(partId);
 
-            rec = new PartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), topVer,
+            rec = new CacheContinuousQueryPartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), topVer,
                 partCntrs != null ? partCntrs.get2() : null);
 
-            PartitionRecovery oldRec = rcvs.putIfAbsent(partId, rec);
+            CacheContinuousQueryPartitionRecovery oldRec = rcvs.putIfAbsent(partId, rec);
 
             if (oldRec != null)
                 rec = oldRec;
@@ -924,6 +886,19 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     /**
      * @param cctx Cache context.
      * @param e Entry.
+     */
+    private void handleBackupEntry(final GridCacheContext cctx, CacheContinuousQueryEntry e) {
+        if (internal || e.updateCounter() == -1L || nodeLeft) // Skip internal query and expire entries.
+            return;
+
+        CacheContinuousQueryEventBuffer buf = partitionBuffer(cctx, e.partition());
+
+        buf.processEntry(e.forBackupQueue(), true);
+    }
+
+    /**
+     * @param cctx Cache context.
+     * @param e Entry.
      * @return Entry.
      */
     private Object handleEntry(final GridCacheContext cctx, CacheContinuousQueryEntry e) {
@@ -942,12 +917,21 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         if (e.updateCounter() == -1L)
             return e;
 
-        CacheContinuousQueryEventBuffer buf = entryBufs.get(e.partition());
+        CacheContinuousQueryEventBuffer buf = partitionBuffer(cctx, e.partition());
 
-        if (buf == null) {
-            final int part = e.partition();
+        return buf.processEntry(e, false);
+    }
 
-            buf = new CacheContinuousQueryEventBuffer() {
+    /**
+     * @param cctx Cache context.
+     * @param part Partition.
+     * @return Event buffer.
+     */
+    private CacheContinuousQueryEventBuffer partitionBuffer(final GridCacheContext cctx, int part) {
+        CacheContinuousQueryEventBuffer buf = entryBufs.get(part);
+
+        if (buf == null) {
+            buf = new CacheContinuousQueryEventBuffer(part) {
                 @Override protected long currentPartitionCounter() {
                     GridDhtLocalPartition locPart = cctx.topology().localPartition(part, null, false);
 
@@ -958,239 +942,18 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 }
             };
 
-            CacheContinuousQueryEventBuffer oldBuf = entryBufs.putIfAbsent(e.partition(), buf);
+            CacheContinuousQueryEventBuffer oldBuf = entryBufs.putIfAbsent(part, buf);
 
             if (oldBuf != null)
                 buf = oldBuf;
         }
 
-        return buf.processEntry(e);
-    }
-
-    /**
-     *
-     */
-    private static class PartitionRecovery {
-        /** Event which means hole in sequence. */
-        private static final CacheContinuousQueryEntry HOLE = new CacheContinuousQueryEntry();
-
-        /** */
-        private final static int MAX_BUFF_SIZE = LSNR_MAX_BUF_SIZE;
-
-        /** */
-        private IgniteLogger log;
-
-        /** */
-        private long lastFiredEvt;
-
-        /** */
-        private AffinityTopologyVersion curTop = AffinityTopologyVersion.NONE;
-
-        /** */
-        private final Map<Long, CacheContinuousQueryEntry> pendingEvts = new TreeMap<>();
-
-        /**
-         * @param log Logger.
-         * @param topVer Topology version.
-         * @param initCntr Update counters.
-         */
-        PartitionRecovery(IgniteLogger log, AffinityTopologyVersion topVer, @Nullable Long initCntr) {
-            this.log = log;
-
-            if (initCntr != null) {
-                assert topVer.topologyVersion() > 0 : topVer;
-
-                this.lastFiredEvt = initCntr;
-
-                curTop = topVer;
-            }
-        }
-
-        /**
-         * Resets cached topology.
-         */
-        void resetTopologyCache() {
-            curTop = AffinityTopologyVersion.NONE;
-        }
-
-        /**
-         * Add continuous entry.
-         *
-         * @param cctx Cache context.
-         * @param cache Cache.
-         * @param entry Cache continuous query entry.
-         * @return Collection entries which will be fired. This collection should contains only non-filtered events.
-         */
-        <K, V> Collection<CacheEntryEvent<? extends K, ? extends V>> collectEntries(
-            CacheContinuousQueryEntry entry,
-            GridCacheContext cctx,
-            IgniteCache cache
-        ) {
-            assert entry != null;
-
-            if (entry.topologyVersion() == null) { // Possible if entry is sent from old node.
-                assert entry.updateCounter() == 0L : entry;
-
-                return F.<CacheEntryEvent<? extends K, ? extends V>>
-                    asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry));
-            }
-
-            List<CacheEntryEvent<? extends K, ? extends V>> entries;
-
-            synchronized (pendingEvts) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Handling event [lastFiredEvt=" + lastFiredEvt +
-                        ", curTop=" + curTop +
-                        ", entUpdCnt=" + entry.updateCounter() +
-                        ", partId=" + entry.partition() +
-                        ", pendingEvts=" + pendingEvts + ']');
-                }
-
-                // Received first event.
-                if (curTop == AffinityTopologyVersion.NONE) {
-                    lastFiredEvt = entry.updateCounter();
-
-                    curTop = entry.topologyVersion();
-
-                    if (log.isDebugEnabled()) {
-                        log.debug("First event [lastFiredEvt=" + lastFiredEvt +
-                            ", curTop=" + curTop +
-                            ", entUpdCnt=" + entry.updateCounter() +
-                            ", partId=" + entry.partition() + ']');
-                    }
-
-                    return !entry.isFiltered() ?
-                        F.<CacheEntryEvent<? extends K, ? extends V>>
-                            asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry)) :
-                        Collections.<CacheEntryEvent<? extends K, ? extends V>>emptyList();
-                }
-
-                if (curTop.compareTo(entry.topologyVersion()) < 0) {
-                    if (entry.updateCounter() == 1L && !entry.isBackup()) {
-                        entries = new ArrayList<>(pendingEvts.size());
-
-                        for (CacheContinuousQueryEntry evt : pendingEvts.values()) {
-                            if (evt != HOLE && !evt.isFiltered())
-                                entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, evt));
-                        }
-
-                        pendingEvts.clear();
-
-                        curTop = entry.topologyVersion();
-
-                        lastFiredEvt = entry.updateCounter();
-
-                        if (!entry.isFiltered())
-                            entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry));
-
-                        if (log.isDebugEnabled())
-                            log.debug("Partition was lost [lastFiredEvt=" + lastFiredEvt +
-                                ", curTop=" + curTop +
-                                ", entUpdCnt=" + entry.updateCounter() +
-                                ", partId=" + entry.partition() +
-                                ", pendingEvts=" + pendingEvts + ']');
-
-                        return entries;
-                    }
-
-                    curTop = entry.topologyVersion();
-                }
-
-                // Check duplicate.
-                if (entry.updateCounter() > lastFiredEvt)
-                    pendingEvts.put(entry.updateCounter(), entry);
-                else {
-                    if (log.isDebugEnabled())
-                        log.debug("Skip duplicate continuous query message: " + entry);
-
-                    return Collections.emptyList();
-                }
-
-                if (pendingEvts.isEmpty()) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Nothing sent to listener [lastFiredEvt=" + lastFiredEvt +
-                            ", curTop=" + curTop +
-                            ", entUpdCnt=" + entry.updateCounter() +
-                            ", partId=" + entry.partition() + ']');
-                    }
-
-                    return Collections.emptyList();
-                }
-
-                Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = pendingEvts.entrySet().iterator();
-
-                entries = new ArrayList<>();
-
-                if (pendingEvts.size() >= MAX_BUFF_SIZE) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Pending events reached max of buffer size [lastFiredEvt=" + lastFiredEvt +
-                            ", curTop=" + curTop +
-                            ", entUpdCnt=" + entry.updateCounter() +
-                            ", partId=" + entry.partition() +
-                            ", pendingEvts=" + pendingEvts + ']');
-                    }
-
-                    LT.warn(log, "Pending events reached max of buffer size [cache=" + cctx.name() +
-                        ", bufSize=" + MAX_BUFF_SIZE +
-                        ", partId=" + entry.partition() + ']');
-
-                    for (int i = 0; i < MAX_BUFF_SIZE - (MAX_BUFF_SIZE / 10); i++) {
-                        Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next();
-
-                        if (e.getValue() != HOLE && !e.getValue().isFiltered())
-                            entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, e.getValue()));
-
-                        lastFiredEvt = e.getKey();
-
-                        iter.remove();
-                    }
-                }
-                else {
-                    while (iter.hasNext()) {
-                        Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next();
-
-                        CacheContinuousQueryEntry pending = e.getValue();
-
-                        long filtered = pending.filteredCount();
-
-                        boolean fire = e.getKey() == lastFiredEvt + 1;;
-
-                        if (!fire && filtered > 0)
-                            fire = e.getKey() - filtered <= lastFiredEvt + 1;
-
-                        if (fire) {
-                            lastFiredEvt = e.getKey();
-
-                            if (e.getValue() != HOLE && !e.getValue().isFiltered())
-                                entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, e.getValue()));
-
-                            iter.remove();
-                        }
-                        else
-                            break;
-                    }
-                }
-            }
-
-            if (log.isDebugEnabled()) {
-                log.debug("Will send to listener the following events [entries=" + entries +
-                    ", lastFiredEvt=" + lastFiredEvt +
-                    ", curTop=" + curTop +
-                    ", entUpdCnt=" + entry.updateCounter() +
-                    ", partId=" + entry.partition() +
-                    ", pendingEvts=" + pendingEvts + ']');
-            }
-
-            return entries;
-        }
+        return buf;
     }
 
     /** {@inheritDoc} */
     @Override public void onNodeLeft() {
-        Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue;
-
-        if (backupQueue0 != null)
-            backupQueue = null;
+        nodeLeft = true;
     }
 
     /** {@inheritDoc} */
@@ -1199,7 +962,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         assert ctx.config().isPeerClassLoadingEnabled();
 
         if (rmtFilter != null && !U.isGrid(rmtFilter.getClass()))
-            rmtFilterDep = new DeployableObject(rmtFilter, ctx);
+            rmtFilterDep = new CacheContinuousQueryDeployableObject(rmtFilter, ctx);
     }
 
     /** {@inheritDoc} */
@@ -1320,7 +1083,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         boolean b = in.readBoolean();
 
         if (b)
-            rmtFilterDep = (DeployableObject)in.readObject();
+            rmtFilterDep = (CacheContinuousQueryDeployableObject)in.readObject();
         else
             rmtFilter = (CacheEntryEventSerializableFilter<K, V>)in.readObject();
 
@@ -1345,95 +1108,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         return ctx.cache().<K, V>context().cacheContext(cacheId);
     }
 
-    /** */
-    private static class AcknowledgeBuffer {
-        /** */
-        private int size;
-
-        /** */
-        @GridToStringInclude
-        private Map<Integer, Long> updateCntrs = new HashMap<>();
-
-        /** */
-        @GridToStringInclude
-        private Set<AffinityTopologyVersion> topVers = U.newHashSet(1);
-
-        /**
-         * @param batch Batch.
-         * @return Non-null tuple if acknowledge should be sent to backups.
-         */
-        @SuppressWarnings("unchecked")
-        @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>
-        onAcknowledged(GridContinuousBatch batch) {
-            assert batch instanceof GridContinuousQueryBatch;
-
-            size += ((GridContinuousQueryBatch)batch).entriesCount();
-
-            Collection<CacheContinuousQueryEntry> entries = (Collection)batch.collect();
-
-            for (CacheContinuousQueryEntry e : entries)
-                addEntry(e);
-
-            return size >= BACKUP_ACK_THRESHOLD ? acknowledgeData() : null;
-        }
-
-        /**
-         * @param e Entry.
-         * @return Non-null tuple if acknowledge should be sent to backups.
-         */
-        @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>
-        onAcknowledged(CacheContinuousQueryEntry e) {
-            size++;
-
-            addEntry(e);
-
-            return size >= BACKUP_ACK_THRESHOLD ? acknowledgeData() : null;
-        }
-
-        /**
-         * @param e Entry.
-         */
-        private void addEntry(CacheContinuousQueryEntry e) {
-            topVers.add(e.topologyVersion());
-
-            Long cntr0 = updateCntrs.get(e.partition());
-
-            if (cntr0 == null || e.updateCounter() > cntr0)
-                updateCntrs.put(e.partition(), e.updateCounter());
-        }
-
-        /**
-         * @return Non-null tuple if acknowledge should be sent to backups.
-         */
-        @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>
-            acknowledgeOnTimeout() {
-            return size > 0 ? acknowledgeData() : null;
-        }
-
-        /**
-         * @return Tuple with acknowledge information.
-         */
-        private IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> acknowledgeData() {
-            assert size > 0;
-
-            Map<Integer, Long> cntrs = new HashMap<>(updateCntrs);
-
-            IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> res =
-                new IgniteBiTuple<>(cntrs, topVers);
-
-            topVers = U.newHashSet(1);
-
-            size = 0;
-
-            return res;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(AcknowledgeBuffer.class, this);
-        }
-    }
-
     /**
      *
      */
@@ -1469,44 +1143,38 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
         /** {@inheritDoc} */
         @Override public void run() {
-            final boolean notify = filter(evt, primary);
-
-            if (!primary())
-                return;
+            final boolean notify = filter(evt);
 
-            if (fut == null) {
-                onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt);
+            if (primary || skipPrimaryCheck) {
+                if (fut == null) {
+                    onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt);
 
-                return;
-            }
+                    return;
+                }
 
-            if (fut.isDone()) {
-                if (fut.error() != null)
-                    evt.entry().markFiltered();
+                if (fut.isDone()) {
+                    if (fut.error() != null)
+                        evt.entry().markFiltered();
 
-                onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt);
-            }
-            else {
-                fut.listen(new CI1<IgniteInternalFuture<?>>() {
-                    @Override public void apply(IgniteInternalFuture<?> f) {
-                        if (f.error() != null)
-                            evt.entry().markFiltered();
-
-                        ctx.asyncCallbackPool().execute(new Runnable() {
-                            @Override public void run() {
-                                onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt);
-                            }
-                        }, evt.entry().partition());
-                    }
-                });
+                    onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt);
+                }
+                else {
+                    fut.listen(new CI1<IgniteInternalFuture<?>>() {
+                        @Override public void apply(IgniteInternalFuture<?> f) {
+                            if (f.error() != null)
+                                evt.entry().markFiltered();
+
+                            ctx.asyncCallbackPool().execute(new Runnable() {
+                                @Override public void run() {
+                                    onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt);
+                                }
+                            }, evt.entry().partition());
+                        }
+                    });
+                }
             }
-        }
-
-        /**
-         * @return {@code True} if event fired on this node.
-         */
-        private boolean primary() {
-            return primary || skipPrimaryCheck;
+            else
+                handleBackupEntry(cacheContext(ctx), evt.entry());
         }
 
         /** {@inheritDoc} */
@@ -1515,82 +1183,4 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         }
     }
 
-    /**
-     * Deployable object.
-     */
-    protected static class DeployableObject implements Externalizable {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Serialized object. */
-        private byte[] bytes;
-
-        /** Deployment class name. */
-        private String clsName;
-
-        /** Deployment info. */
-        private GridDeploymentInfo depInfo;
-
-        /**
-         * Required by {@link Externalizable}.
-         */
-        public DeployableObject() {
-            // No-op.
-        }
-
-        /**
-         * @param obj Object.
-         * @param ctx Kernal context.
-         * @throws IgniteCheckedException In case of error.
-         */
-        protected DeployableObject(Object obj, GridKernalContext ctx) throws IgniteCheckedException {
-            assert obj != null;
-            assert ctx != null;
-
-            Class cls = U.detectClass(obj);
-
-            clsName = cls.getName();
-
-            GridDeployment dep = ctx.deploy().deploy(cls, U.detectClassLoader(cls));
-
-            if (dep == null)
-                throw new IgniteDeploymentCheckedException("Failed to deploy object: " + obj);
-
-            depInfo = new GridDeploymentInfoBean(dep);
-
-            bytes = U.marshal(ctx, obj);
-        }
-
-        /**
-         * @param nodeId Node ID.
-         * @param ctx Kernal context.
-         * @return Deserialized object.
-         * @throws IgniteCheckedException In case of error.
-         */
-        <T> T unmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException {
-            assert ctx != null;
-
-            GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName,
-                depInfo.userVersion(), nodeId, depInfo.classLoaderId(), depInfo.participants(), null);
-
-            if (dep == null)
-                throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName);
-
-            return U.unmarshal(ctx, bytes, U.resolveClassLoader(dep.classLoader(), ctx.config()));
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            U.writeByteArray(out, bytes);
-            U.writeString(out, clsName);
-            out.writeObject(depInfo);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            bytes = U.readByteArray(in);
-            clsName = U.readString(in);
-            depInfo = (GridDeploymentInfo)in.readObject();
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7bf63c0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
index 7aef4dd..e48d22e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
@@ -44,7 +44,7 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
     private Factory<? extends CacheEntryEventFilter> rmtFilterFactory;
 
     /** Deployable object for filter factory. */
-    private DeployableObject rmtFilterFactoryDep;
+    private CacheContinuousQueryDeployableObject rmtFilterFactoryDep;
 
     /** Event types for JCache API. */
     private byte types;
@@ -122,7 +122,7 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
         super.p2pMarshal(ctx);
 
         if (rmtFilterFactory != null && !U.isGrid(rmtFilterFactory.getClass()))
-            rmtFilterFactoryDep = new DeployableObject(rmtFilterFactory, ctx);
+            rmtFilterFactoryDep = new CacheContinuousQueryDeployableObject(rmtFilterFactory, ctx);
     }
 
     /** {@inheritDoc} */
@@ -167,7 +167,7 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
         boolean b = in.readBoolean();
 
         if (b)
-            rmtFilterFactoryDep = (DeployableObject)in.readObject();
+            rmtFilterFactoryDep = (CacheContinuousQueryDeployableObject)in.readObject();
         else
             rmtFilterFactory = (Factory)in.readObject();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7bf63c0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
new file mode 100644
index 0000000..534ce9c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import javax.cache.event.CacheEntryEvent;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.LT;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+class CacheContinuousQueryPartitionRecovery {
+    /** Event which means hole in sequence. */
+    private static final CacheContinuousQueryEntry HOLE = new CacheContinuousQueryEntry();
+
+    /** */
+    private final static int MAX_BUFF_SIZE = CacheContinuousQueryHandler.LSNR_MAX_BUF_SIZE;
+
+    /** */
+    private IgniteLogger log;
+
+    /** */
+    private long lastFiredEvt;
+
+    /** */
+    private AffinityTopologyVersion curTop = AffinityTopologyVersion.NONE;
+
+    /** */
+    private final Map<Long, CacheContinuousQueryEntry> pendingEvts = new TreeMap<>();
+
+    /**
+     * @param log Logger.
+     * @param topVer Topology version.
+     * @param initCntr Update counters.
+     */
+    CacheContinuousQueryPartitionRecovery(IgniteLogger log, AffinityTopologyVersion topVer, @Nullable Long initCntr) {
+        this.log = log;
+
+        if (initCntr != null) {
+            assert topVer.topologyVersion() > 0 : topVer;
+
+            this.lastFiredEvt = initCntr;
+
+            curTop = topVer;
+        }
+    }
+
+    /**
+     * Resets cached topology.
+     */
+    void resetTopologyCache() {
+        curTop = AffinityTopologyVersion.NONE;
+    }
+
+    /**
+     * Add continuous entry.
+     *
+     * @param cctx Cache context.
+     * @param cache Cache.
+     * @param entry Cache continuous query entry.
+     * @return Collection entries which will be fired. This collection should contains only non-filtered events.
+     */
+    <K, V> Collection<CacheEntryEvent<? extends K, ? extends V>> collectEntries(
+        CacheContinuousQueryEntry entry,
+        GridCacheContext cctx,
+        IgniteCache cache
+    ) {
+        assert entry != null;
+
+        if (entry.topologyVersion() == null) { // Possible if entry is sent from old node.
+            assert entry.updateCounter() == 0L : entry;
+
+            return F.<CacheEntryEvent<? extends K, ? extends V>>
+                asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry));
+        }
+
+        List<CacheEntryEvent<? extends K, ? extends V>> entries;
+
+        synchronized (pendingEvts) {
+            if (log.isDebugEnabled()) {
+                log.debug("Handling event [lastFiredEvt=" + lastFiredEvt +
+                    ", curTop=" + curTop +
+                    ", entUpdCnt=" + entry.updateCounter() +
+                    ", partId=" + entry.partition() +
+                    ", pendingEvts=" + pendingEvts + ']');
+            }
+
+            // Received first event.
+            if (curTop == AffinityTopologyVersion.NONE) {
+                lastFiredEvt = entry.updateCounter();
+
+                curTop = entry.topologyVersion();
+
+                if (log.isDebugEnabled()) {
+                    log.debug("First event [lastFiredEvt=" + lastFiredEvt +
+                        ", curTop=" + curTop +
+                        ", entUpdCnt=" + entry.updateCounter() +
+                        ", partId=" + entry.partition() + ']');
+                }
+
+                return !entry.isFiltered() ?
+                    F.<CacheEntryEvent<? extends K, ? extends V>>
+                        asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry)) :
+                    Collections.<CacheEntryEvent<? extends K, ? extends V>>emptyList();
+            }
+
+            if (curTop.compareTo(entry.topologyVersion()) < 0) {
+                if (entry.updateCounter() == 1L && !entry.isBackup()) {
+                    entries = new ArrayList<>(pendingEvts.size());
+
+                    for (CacheContinuousQueryEntry evt : pendingEvts.values()) {
+                        if (evt != HOLE && !evt.isFiltered())
+                            entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, evt));
+                    }
+
+                    pendingEvts.clear();
+
+                    curTop = entry.topologyVersion();
+
+                    lastFiredEvt = entry.updateCounter();
+
+                    if (!entry.isFiltered())
+                        entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry));
+
+                    if (log.isDebugEnabled())
+                        log.debug("Partition was lost [lastFiredEvt=" + lastFiredEvt +
+                            ", curTop=" + curTop +
+                            ", entUpdCnt=" + entry.updateCounter() +
+                            ", partId=" + entry.partition() +
+                            ", pendingEvts=" + pendingEvts + ']');
+
+                    return entries;
+                }
+
+                curTop = entry.topologyVersion();
+            }
+
+            // Check duplicate.
+            if (entry.updateCounter() > lastFiredEvt)
+                pendingEvts.put(entry.updateCounter(), entry);
+            else {
+                if (log.isDebugEnabled())
+                    log.debug("Skip duplicate continuous query message: " + entry);
+
+                return Collections.emptyList();
+            }
+
+            if (pendingEvts.isEmpty()) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Nothing sent to listener [lastFiredEvt=" + lastFiredEvt +
+                        ", curTop=" + curTop +
+                        ", entUpdCnt=" + entry.updateCounter() +
+                        ", partId=" + entry.partition() + ']');
+                }
+
+                return Collections.emptyList();
+            }
+
+            Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = pendingEvts.entrySet().iterator();
+
+            entries = new ArrayList<>();
+
+            if (pendingEvts.size() >= MAX_BUFF_SIZE) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Pending events reached max of buffer size [lastFiredEvt=" + lastFiredEvt +
+                        ", curTop=" + curTop +
+                        ", entUpdCnt=" + entry.updateCounter() +
+                        ", partId=" + entry.partition() +
+                        ", pendingEvts=" + pendingEvts + ']');
+                }
+
+                LT.warn(log, "Pending events reached max of buffer size [cache=" + cctx.name() +
+                    ", bufSize=" + MAX_BUFF_SIZE +
+                    ", partId=" + entry.partition() + ']');
+
+                for (int i = 0; i < MAX_BUFF_SIZE - (MAX_BUFF_SIZE / 10); i++) {
+                    Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next();
+
+                    if (e.getValue() != HOLE && !e.getValue().isFiltered())
+                        entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, e.getValue()));
+
+                    lastFiredEvt = e.getKey();
+
+                    iter.remove();
+                }
+            }
+            else {
+                while (iter.hasNext()) {
+                    Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next();
+
+                    CacheContinuousQueryEntry pending = e.getValue();
+
+                    long filtered = pending.filteredCount();
+
+                    boolean fire = e.getKey() == lastFiredEvt + 1;;
+
+                    if (!fire && filtered > 0)
+                        fire = e.getKey() - filtered <= lastFiredEvt + 1;
+
+                    if (fire) {
+                        lastFiredEvt = e.getKey();
+
+                        if (e.getValue() != HOLE && !e.getValue().isFiltered())
+                            entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, e.getValue()));
+
+                        iter.remove();
+                    }
+                    else
+                        break;
+                }
+            }
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Will send to listener the following events [entries=" + entries +
+                ", lastFiredEvt=" + lastFiredEvt +
+                ", curTop=" + curTop +
+                ", entUpdCnt=" + entry.updateCounter() +
+                ", partId=" + entry.partition() +
+                ", pendingEvts=" + pendingEvts + ']');
+        }
+
+        return entries;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7bf63c0e/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index da951f2..a72dcd6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -75,7 +75,6 @@ import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -872,10 +871,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 sendNotification(nodeId, routineId, null, toSnd, orderedTopic, true, null);
         }
         else {
-            LocalRoutineInfo localRoutineInfo = locInfos.get(routineId);
+            LocalRoutineInfo locRoutineInfo = locInfos.get(routineId);
 
-            if (localRoutineInfo != null)
-                localRoutineInfo.handler().notifyCallback(nodeId, routineId, objs, ctx);
+            if (locRoutineInfo != null)
+                locRoutineInfo.handler().notifyCallback(nodeId, routineId, objs, ctx);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7bf63c0e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java
index bc32e00..4710593 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java
@@ -66,14 +66,14 @@ public class CacheContinuousQueryEventBufferTest extends GridCommonAbstractTest
         for (int i = 0; i < 10; i++) {
             int cnt = rnd.nextInt(10_000) + 1;
 
-            testBuffer(rnd, new CacheContinuousQueryEventBuffer(), cnt, 1, 0.5f, threads);
-            testBuffer(rnd, new CacheContinuousQueryEventBuffer(), cnt, 1, 0.9f, threads);
-            testBuffer(rnd, new CacheContinuousQueryEventBuffer(), cnt, 1, 0.99f, threads);
-            testBuffer(rnd, new CacheContinuousQueryEventBuffer(), cnt, 1, 0.01f, threads);
-            testBuffer(rnd, new CacheContinuousQueryEventBuffer(), cnt, 1, 0.f, threads);
+            testBuffer(rnd, new CacheContinuousQueryEventBuffer(0), cnt, 1, 0.5f, threads);
+            testBuffer(rnd, new CacheContinuousQueryEventBuffer(0), cnt, 1, 0.9f, threads);
+            testBuffer(rnd, new CacheContinuousQueryEventBuffer(0), cnt, 1, 0.99f, threads);
+            testBuffer(rnd, new CacheContinuousQueryEventBuffer(0), cnt, 1, 0.01f, threads);
+            testBuffer(rnd, new CacheContinuousQueryEventBuffer(0), cnt, 1, 0.f, threads);
         }
 
-        CacheContinuousQueryEventBuffer b = new CacheContinuousQueryEventBuffer();
+        CacheContinuousQueryEventBuffer b = new CacheContinuousQueryEventBuffer(0);
 
         long cntr = 1;
 
@@ -106,32 +106,31 @@ public class CacheContinuousQueryEventBufferTest extends GridCommonAbstractTest
     {
         List<CacheContinuousQueryEntry> expEntries = new ArrayList<>();
 
-        List<Object> entries = new ArrayList<>();
+        List<CacheContinuousQueryEntry> entries = new ArrayList<>();
 
         long filtered = b.currentFiltered();
 
         for (int i = 0; i < cnt; i++) {
-            if (rnd.nextFloat() < filterRatio) {
-                entries.add(cntr);
+            CacheContinuousQueryEntry entry = new CacheContinuousQueryEntry(
+                0,
+                EventType.CREATED,
+                null,
+                null,
+                null,
+                false,
+                0,
+                cntr,
+                null);
+
 
-                cntr++;
+            entries.add(entry);
+
+            if (rnd.nextFloat() < filterRatio) {
+                entry.markFiltered();
 
                 filtered++;
             }
             else {
-                CacheContinuousQueryEntry entry = new CacheContinuousQueryEntry(
-                    0,
-                    EventType.CREATED,
-                    null,
-                    null,
-                    null,
-                    false,
-                    0,
-                    cntr,
-                    null);
-
-                entries.add(entry);
-
                 CacheContinuousQueryEntry expEntry = new CacheContinuousQueryEntry(
                     0,
                     EventType.CREATED,
@@ -145,12 +144,12 @@ public class CacheContinuousQueryEventBufferTest extends GridCommonAbstractTest
 
                 expEntry.filteredCount(filtered);
 
-                cntr++;
-
                 expEntries.add(expEntry);
 
                 filtered = 0;
             }
+
+            cntr++;
         }
 
         Collections.shuffle(entries, rnd);
@@ -161,12 +160,7 @@ public class CacheContinuousQueryEventBufferTest extends GridCommonAbstractTest
             for (int i = 0; i < entries.size(); i++) {
                 Object o = entries.get(i);
 
-                Object res;
-
-                if (o instanceof Long)
-                    res = b.processFiltered((Long)o);
-                else
-                    res = b.processEntry((CacheContinuousQueryEntry)o);
+                Object res = b.processEntry((CacheContinuousQueryEntry)o, false);
 
                 if (res != null) {
                     if (res instanceof CacheContinuousQueryEntry)
@@ -179,7 +173,7 @@ public class CacheContinuousQueryEventBufferTest extends GridCommonAbstractTest
         else {
             final CyclicBarrier barrier = new CyclicBarrier(threads);
 
-            final ConcurrentLinkedQueue<Object> q = new ConcurrentLinkedQueue<>(entries);
+            final ConcurrentLinkedQueue<CacheContinuousQueryEntry> q = new ConcurrentLinkedQueue<>(entries);
 
             final ConcurrentSkipListMap<Long, CacheContinuousQueryEntry> act0 = new ConcurrentSkipListMap<>();
 
@@ -190,12 +184,7 @@ public class CacheContinuousQueryEventBufferTest extends GridCommonAbstractTest
                     Object o;
 
                     while ((o = q.poll()) != null) {
-                        Object res;
-
-                        if (o instanceof Long)
-                            res = b.processFiltered((Long)o);
-                        else
-                            res = b.processEntry((CacheContinuousQueryEntry)o);
+                        Object res = b.processEntry((CacheContinuousQueryEntry)o, false);
 
                         if (res != null) {
                             if (res instanceof CacheContinuousQueryEntry)


Mime
View raw message