ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject [06/36] ignite git commit: IGNITE-426 WIP test and semen reviewed fixes
Date Wed, 04 Nov 2015 14:10:49 GMT
IGNITE-426 WIP test and semen reviewed fixes


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2b3ee721
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2b3ee721
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2b3ee721

Branch: refs/heads/ignite-462-2
Commit: 2b3ee7210693f70d69a4393c7e248e123b6ec37c
Parents: dc9aab7
Author: nikolay_tikhonov <ntikhonov@gridgain.com>
Authored: Thu Oct 8 15:27:28 2015 +0300
Committer: nikolay_tikhonov <ntikhonov@gridgain.com>
Committed: Wed Nov 4 17:02:33 2015 +0300

----------------------------------------------------------------------
 .../internal/GridEventConsumeHandler.java       |   5 -
 .../internal/GridMessageListenHandler.java      |   5 -
 .../communication/GridIoMessageFactory.java     |   6 -
 .../processors/cache/GridCacheMapEntry.java     |   4 +-
 .../distributed/dht/GridDhtLocalPartition.java  |  30 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |   6 +-
 .../continuous/CacheContinuousQueryEntry.java   |  70 +-
 .../continuous/CacheContinuousQueryHandler.java | 263 ++---
 .../CacheContinuousQueryListener.java           |  15 -
 .../CacheContinuousQueryLostPartition.java      | 148 ---
 .../continuous/CacheContinuousQueryManager.java |  33 +-
 .../continuous/GridContinuousHandler.java       |   6 -
 ...acheContinuousQueryFailoverAbstractTest.java | 965 ++++++++++++++++---
 13 files changed, 985 insertions(+), 571 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2b3ee721/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index ade7597..dc3842b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -389,11 +389,6 @@ class GridEventConsumeHandler implements GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
-    @Override public void partitionLost(String cacheName, int partId) {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
     @Nullable @Override public Object orderedTopic() {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2b3ee721/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index e038794..bddebba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -179,11 +179,6 @@ public class GridMessageListenHandler implements GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
-    @Override public void partitionLost(String cacheName, int partId) {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
     @Nullable @Override public Object orderedTopic() {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2b3ee721/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 3474f84..6f71d57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -92,7 +92,6 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryBatchAck;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEntry;
-import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryLostPartition;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.transactions.TxEntryValueHolder;
@@ -691,11 +690,6 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
-            case 115:
-                msg = new CacheContinuousQueryLostPartition();
-
-                break;
-
             // [-3..115] - this
             // [120..123] - DR
             // [-4..-22] - SQL

http://git-wip-us.apache.org/repos/asf/ignite/blob/2b3ee721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index d8fa93c..d23bdf2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -3249,14 +3249,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
     private long nextPartIndex(AffinityTopologyVersion topVer) {
         long updateIdx;
 
-        //U.dumpStack();
-
         if (!cctx.isLocal() && !isNear()) {
             GridDhtLocalPartition locPart = cctx.topology().localPartition(partition(), topVer, false);
 
             assert locPart != null;
 
-            updateIdx = locPart.nextContinuousQueryUpdateIndex();
+            updateIdx = locPart.nextUpdateIndex();
         }
         else
             updateIdx = 0;

http://git-wip-us.apache.org/repos/asf/ignite/blob/2b3ee721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 86f1f41..ba6ff5c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -58,18 +58,6 @@ import org.jetbrains.annotations.NotNull;
 import org.jsr166.ConcurrentHashMap8;
 import org.jsr166.LongAdder8;
 
-import javax.cache.CacheException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.atomic.AtomicStampedReference;
-import java.util.concurrent.locks.ReentrantLock;
-
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_UNLOADED;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED;
@@ -128,8 +116,8 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
     /** Group reservations. */
     private final CopyOnWriteArrayList<GridDhtPartitionsReservation> reservations = new CopyOnWriteArrayList<>();
 
-    /** Continuous query update index. */
-    private final AtomicLong contQryUpdIdx = new AtomicLong();
+    /** Update index. */
+    private final AtomicLong updIdx = new AtomicLong();
 
     /**
      * @param cctx Context.
@@ -636,28 +624,28 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
     /**
      * @return Next update index.
      */
-    public long nextContinuousQueryUpdateIndex() {
-        return contQryUpdIdx.incrementAndGet();
+    public long nextUpdateIndex() {
+        return updIdx.incrementAndGet();
     }
 
     /**
      * @return Current update index.
      */
-    public long continuousQueryUpdateIndex() {
-        return contQryUpdIdx.get();
+    public long updateIndex() {
+        return updIdx.get();
     }
 
     /**
      * @param val Update index value.
      */
-    public void continuousQueryUpdateIndex(long val) {
+    public void updateIndex(long val) {
         while (true) {
-            long val0 = contQryUpdIdx.get();
+            long val0 = updIdx.get();
 
             if (val0 >= val)
                 break;
 
-            if (contQryUpdIdx.compareAndSet(val0, val))
+            if (updIdx.compareAndSet(val0, val))
                 break;
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2b3ee721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 5d312b6..098a60d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -939,7 +939,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     Long cntr = cntrMap.get(part.id());
 
                     if (cntr != null)
-                        part.continuousQueryUpdateIndex(cntr);
+                        part.updateIndex(cntr);
                 }
             }
 
@@ -1053,7 +1053,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     Long cntr = cntrMap.get(part.id());
 
                     if (cntr != null)
-                        part.continuousQueryUpdateIndex(cntr);
+                        part.updateIndex(cntr);
                 }
             }
 
@@ -1317,7 +1317,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             for (GridDhtLocalPartition part : locParts.values()) {
                 Long cntr0 = res.get(part.id());
-                Long cntr1 = part.continuousQueryUpdateIndex();
+                Long cntr1 = part.updateIndex();
 
                 if (cntr0 == null || cntr1 > cntr0)
                     res.put(part.id(), cntr1);

http://git-wip-us.apache.org/repos/asf/ignite/blob/2b3ee721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
index 9e73142..eefbbae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -43,6 +43,15 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
     private static final long serialVersionUID = 0L;
 
     /** */
+    private static final byte BACKUP_ENTRY = 0b0001;
+
+    /** */
+    private static final byte ORDERED_ENTRY = 0b0010;
+
+    /** */
+    private static final byte FILTERED_ENTRY = 0b0100;
+
+    /** */
     private static final EventType[] EVT_TYPE_VALS = EventType.values();
 
     /**
@@ -82,8 +91,8 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
     /** Update index. */
     private long updateIdx;
 
-    /** */
-    private boolean filtered;
+    /** Flags. */
+    private byte flags;
 
     /** */
     @GridToStringInclude
@@ -91,7 +100,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
     private AffinityTopologyVersion topVer;
 
     /**
-     * Required by {@link org.apache.ignite.plugin.extensions.communication.Message}.
+     * Required by {@link Message}.
      */
     public CacheContinuousQueryEntry() {
         // No-op.
@@ -134,13 +143,6 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
     }
 
     /**
-     * @return Cache ID.
-     */
-    int cacheId() {
-        return cacheId;
-    }
-
-    /**
      * @return Event type.
      */
     EventType eventType() {
@@ -155,24 +157,52 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
     }
 
     /**
-     * Mark this event as filtered.
+     * @return Update index.
+     */
+    long updateIndex() {
+        return updateIdx;
+    }
+
+    /**
+     * Mark that entry create on backup.
+     */
+    void markBackup() {
+        flags |= BACKUP_ENTRY;
+    }
+
+    /**
+     * Mark that entry ordered.
+     */
+    void markOrdered() {
+        flags |= ORDERED_ENTRY;
+    }
+
+    /**
+     * Mark that entry filtered.
      */
     void markFiltered() {
-        filtered = true;
+        flags |= FILTERED_ENTRY;
     }
 
     /**
-     * @return Update index.
+     * @return {@code True} if entry sent by backup node.
      */
-    long updateIndex() {
-        return updateIdx;
+    boolean isBackup() {
+        return (flags & BACKUP_ENTRY) != 0;
+    }
+
+    /**
+     * @return {@code True} .
+     */
+    boolean isOrdered() {
+        return (flags & ORDERED_ENTRY) != 0;
     }
 
     /**
-     * @return Filtered entry.
+     * @return {@code True} if entry was filtered.
      */
     boolean filtered() {
-        return filtered;
+        return (flags & FILTERED_ENTRY) != 0;
     }
 
     /**
@@ -297,7 +327,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
                 writer.incrementState();
 
             case 7:
-                if (!writer.writeBoolean("filtered", filtered))
+                if (!writer.writeByte("flags", flags))
                     return false;
 
                 writer.incrementState();
@@ -376,7 +406,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
                 reader.incrementState();
 
             case 7:
-                filtered = reader.readBoolean("filtered");
+                flags = reader.readByte("flags");
 
                 if (!reader.isLastRead())
                     return false;
@@ -390,7 +420,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 7;
+        return 8;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/2b3ee721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index e8c67ab..c537854 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -63,6 +64,7 @@ import org.apache.ignite.internal.processors.platform.cache.query.PlatformContin
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -84,9 +86,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
     /** */
     private static final int BACKUP_ACK_THRESHOLD = 100;
 
-    /** */
-    private static final int QUERY_HOLE_THRESHOLD = 5;
-
     /** Cache name. */
     private String cacheName;
 
@@ -291,20 +290,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
                     if (primary || skipPrimaryCheck) {
                         if (loc) {
                             if (!localCache) {
-                                PartitionRecovery rcv = rcvs.get(entry.partition());
-
-                                if (rcv == null) {
-                                    rcv = new PartitionRecovery(ctx.log(getClass()));
-
-                                    PartitionRecovery oldRec = rcvs.putIfAbsent(entry.partition(), rcv);
-
-                                    if (oldRec != null)
-                                        rcv = oldRec;
-                                }
-
-                                rcv.add(entry);
-
-                                Collection<CacheContinuousQueryEntry> entries = rcv.entries();
+                                Collection<CacheContinuousQueryEntry> entries = handleEntry(ctx, entry);
 
                                 if (!entries.isEmpty()) {
                                     final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name());
@@ -342,8 +328,11 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
                             ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true);
                         }
                     }
-                    else
+                    else {
+                        entry.markBackup();
+
                         backupQueue.add(entry);
+                    }
                 }
                 catch (ClusterTopologyCheckedException ex) {
                     IgniteLogger log = ctx.log(getClass());
@@ -378,54 +367,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
                 }
             }
 
-            @Override public void partitionLost(int partId) {
-                assert rcvs != null;
-
-                PartitionRecovery rcv = rcvs.get(partId);
-
-                if (rcv != null)
-                    rcv.reset();
-            }
-
-            @Override public void firePartitionLostEvent(String cacheName0, final int partId) {
-                GridCacheContext<K, V> cctx = cacheContext(ctx);
-
-                // Check that cache stopped.
-                if (cctx == null)
-                    return;
-
-                if ((cacheName == null && cacheName0 == null) || // Check default cache.
-                    (cacheName0 != null && cacheName != null && cacheName0.equals(cacheName))) {
-                    ctx.closure().runLocalSafe(new Runnable() {
-                        @Override public void run() {
-                            GridCacheContext<K, V> cctx = cacheContext(ctx);
-
-                            CacheContinuousQueryLostPartition msg = new CacheContinuousQueryLostPartition(
-                                routineId,
-                                cctx.cacheId(),
-                                partId);
-
-                            try {
-                                cctx.io().send(nodeId, msg, GridIoPolicy.SYSTEM_POOL);
-                            }
-                            catch (ClusterTopologyCheckedException e) {
-                                IgniteLogger log = ctx.log(getClass());
-
-                                if (log.isDebugEnabled())
-                                    log.debug("Failed to send lost partition message, node left " +
-                                        "[msg=" + msg + ", nodeId=" + routineId + ']');
-                            }
-                            catch (IgniteCheckedException e) {
-                                IgniteLogger log = ctx.log(getClass());
-
-                                U.error(log, "Failed to send lost partition message " +
-                                    "[msg=" + msg + ", nodeId=" + routineId + ']', e);
-                            }
-                        }
-                    });
-                }
-            }
-
             @Override public void onUnregister() {
                 if (rmtFilter instanceof PlatformContinuousQueryFilter)
                     ((PlatformContinuousQueryFilter)rmtFilter).onQueryUnregister();
@@ -574,31 +515,10 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
 
         final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name());
 
-        Map<Integer, PartitionRecovery> parts = new HashMap<>();
-
-        for (CacheContinuousQueryEntry e : entries) {
-            PartitionRecovery rec = parts.containsKey(e.partition()) ?
-                parts.get(e.partition()) : rcvs.get(e.partition());
-
-            if (rec == null) {
-                rec = new PartitionRecovery(ctx.log(getClass()));
-
-                PartitionRecovery oldRec = rcvs.putIfAbsent(e.partition(), rec);
-
-                if (oldRec != null)
-                    rec = oldRec;
-            }
-
-            rec.add(e);
-
-            if (!parts.containsKey(e.partition()))
-                parts.put(e.partition(), rec);
-        }
-
         Collection<CacheContinuousQueryEntry> entries0 = new ArrayList<>();
 
-        for (PartitionRecovery rec : parts.values())
-            entries0.addAll(rec.entries());
+        for (CacheContinuousQueryEntry e : entries)
+            entries0.addAll(handleEntry(ctx, e));
 
         Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0,
             new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() {
@@ -617,6 +537,39 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
     }
 
     /**
+     * @param ctx Context.
+     * @param e entry.
+     * @return Entry collection.
+     */
+    private Collection<CacheContinuousQueryEntry> handleEntry(GridKernalContext ctx, CacheContinuousQueryEntry e) {
+        assert e != null;
+
+        // Initial query entry or evicted entry.
+        // This events should be fired immediately.
+        if (e.updateIndex() == -1)
+            return F.asList(e);
+
+        PartitionRecovery rec = rcvs.get(e.partition());
+
+        if (rec == null) {
+            rec = new PartitionRecovery(ctx.log(getClass()));
+
+            PartitionRecovery oldRec = rcvs.putIfAbsent(e.partition(), rec);
+
+            if (oldRec != null)
+                rec = oldRec;
+        }
+
+        Collection<CacheContinuousQueryEntry> entries = rec.collectEntries(e);
+
+        if (CacheContinuousQueryManager.SUPER_DEBUG)
+            ctx.log(getClass()).error("Fire the following event for partition : " + e.partition() +
+                " Entries: " + Arrays.toString(entries.toArray()));
+
+        return entries;
+    }
+
+    /**
      *
      */
     private static class PartitionRecovery {
@@ -624,11 +577,16 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
         private IgniteLogger log;
 
         /** */
-        private long lastFiredEvt = 0;
+        private static final long INIT_VALUE = -100;
+
+        /** */
+        private long lastFiredEvt = INIT_VALUE;
 
         /** */
         private final Map<Long, CacheContinuousQueryEntry> pendingEnts = new TreeMap<>();
 
+        private List<T2<Long, CacheContinuousQueryEntry>> firedEvents = new ArrayList<>();
+
         /**
          * @param log Logger.
          */
@@ -639,99 +597,83 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
         /**
          * Add continuous entry.
          *
-         * @param e Cache continuous qeury entry.
+         * @param entry Cache continuous query entry.
+         * @return Collection entries which will be fired.
          */
-        public void add(CacheContinuousQueryEntry e) {
-            assert e != null;
+        public Collection<CacheContinuousQueryEntry> collectEntries(CacheContinuousQueryEntry entry) {
+            assert entry != null;
 
-            synchronized (pendingEnts) {
-                if (!pendingEnts.containsKey(e.updateIndex()) && e.updateIndex() > lastFiredEvt)
-                    pendingEnts.put(e.updateIndex(), e);
-                else if (log.isDebugEnabled())
-                    log.debug("Skip duplicate continuous query message: " + e);
-            }
-        }
-
-        /**
-         * @return Ordered continuous query entries.
-         */
-        public Collection<CacheContinuousQueryEntry> entries() {
-            List<CacheContinuousQueryEntry> entries = new ArrayList<>();
+            List<CacheContinuousQueryEntry> entries;
 
             synchronized (pendingEnts) {
-                if (pendingEnts.isEmpty())
-                    return Collections.emptyList();
+                // Received first event.
+                if (lastFiredEvt == INIT_VALUE) {
+                    if (CacheContinuousQueryManager.SUPER_DEBUG)
+                        log.error("First event. " + entry);
 
-                Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = pendingEnts.entrySet().iterator();
+                    lastFiredEvt = entry.updateIndex();
 
-                boolean fired = false;
+                    firedEvents.add(new T2<>(lastFiredEvt, entry));
 
-                // The elements are consistently.
-                while (iter.hasNext()) {
-                    Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next();
-
-                    if (e.getKey() == lastFiredEvt + 1) {
-                        ++lastFiredEvt;
-
-                        entries.add(e.getValue());
-
-                        iter.remove();
-
-                        fired = true;
-                    }
+                    return F.asList(entry);
                 }
 
-                if (!fired && lastFiredEvt == 0 && pendingEnts.size() >= QUERY_HOLE_THRESHOLD) {
-                    Long prevCnt = null;
+                // Handle case when nodes owning partition left from topology.
+                if (entry.updateIndex() == 1 && !entry.isBackup()) {
+                    pendingEnts.clear();
 
-                    int orderedCnt = 0;
+                    lastFiredEvt = 1;
 
-                    for (Long cnt : pendingEnts.keySet()) {
-                        if (prevCnt != null) {
-                            if (prevCnt + 1 != cnt)
-                                break;
-                            else
-                                ++orderedCnt;
-                        }
+                    if (CacheContinuousQueryManager.SUPER_DEBUG)
+                        log.error("Lost partition. Start from 1. Entry: " + entry);
 
-                        prevCnt = cnt;
-                    }
+                    firedEvents.add(new T2<>(lastFiredEvt, entry));
 
-                    if (orderedCnt >= QUERY_HOLE_THRESHOLD) {
-                        iter = pendingEnts.entrySet().iterator();
+                    return F.asList(entry);
+                }
 
-                        while (entries.size() < orderedCnt) {
-                            Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next();
+                // Check duplicate.
+                if (entry.updateIndex() > lastFiredEvt) {
+                    if (CacheContinuousQueryManager.SUPER_DEBUG)
+                        log.error("Put message to pending queue. Counter value: " + lastFiredEvt + " Entry: " + entry);
 
-                            entries.add(e.getValue());
+                    pendingEnts.put(entry.updateIndex(), entry);
+                }
+                else {
+                    if (log.isDebugEnabled())
+                        log.debug("Skip duplicate continuous query message: " + entry);
 
-                            lastFiredEvt = e.getKey();
+                    if (CacheContinuousQueryManager.SUPER_DEBUG)
+                        log.error("Received duplicate. Counter value: " + lastFiredEvt + " Entry: " + entry
+                            + ", Proceed message " + Arrays.toString(firedEvents.toArray()));
 
-                            iter.remove();
-                        }
-                    }
+                    return Collections.emptyList();
                 }
-            }
 
-            return entries;
-        }
+                if (pendingEnts.isEmpty())
+                    return Collections.emptyList();
 
-        /**
-         * Reset internal state.
-         */
-        public void reset() {
-            synchronized (pendingEnts) {
                 Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = pendingEnts.entrySet().iterator();
 
+                entries = new ArrayList<>();
+
+                // Elements are consistently.
                 while (iter.hasNext()) {
                     Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next();
 
-                    if (e.getKey() >= lastFiredEvt)
+                    if (e.getKey() == lastFiredEvt + 1) {
+                        ++lastFiredEvt;
+
+                        entries.add(e.getValue());
+
+                        firedEvents.add(new T2<>(e.getKey(), e.getValue()));
+
                         iter.remove();
+                    }
                 }
-
-                lastFiredEvt = 0;
             }
+
+            return entries;
         }
     }
 
@@ -766,17 +708,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
         sendBackupAcknowledge(ackBuf.onAcknowledged(batch), routineId, ctx);
     }
 
-    /** {@inheritDoc} */
-    @Override public void partitionLost(String cacheName, int partId) {
-        if ((this.cacheName == null && cacheName == null) // Check default caches.
-            || (cacheName != null && this.cacheName != null && cacheName.equals(this.cacheName))) {
-            PartitionRecovery rcv = rcvs.get(partId);
-
-            if (rcv != null)
-                rcv.reset();
-        }
-    }
-
     /**
      * @param t Acknowledge information.
      * @param routineId Routine ID.

http://git-wip-us.apache.org/repos/asf/ignite/blob/2b3ee721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index a706105..2f9e111 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -55,21 +55,6 @@ interface CacheContinuousQueryListener<K, V> {
     public void cleanupBackupQueue(Map<Integer, Long> updateIdxs);
 
     /**
-     * Fire event that partition lost.
-     *
-     * @param cacheName Cache name.
-     * @param partId Partition ID.
-     */
-    public void firePartitionLostEvent(String cacheName, int partId);
-
-    /**
-     * Handle partition lost event.
-     *
-     * @param partId Partition ID.
-     */
-    public void partitionLost(int partId);
-
-    /**
      * Flushes backup queue.
      *
      * @param ctx Context.

http://git-wip-us.apache.org/repos/asf/ignite/blob/2b3ee721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartition.java
deleted file mode 100644
index eeb20cc..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartition.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.query.continuous;
-
-import java.nio.ByteBuffer;
-import java.util.UUID;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- * Continuous query entry.
- */
-public class CacheContinuousQueryLostPartition extends GridCacheMessage {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Routine ID. */
-    private UUID routineId;
-
-    /** Partition. */
-    private int part;
-
-    /**
-     * Required by {@link Message}.
-     */
-    public CacheContinuousQueryLostPartition() {
-        // No-op.
-    }
-
-    /**
-     * @param cacheId Cache ID.
-     * @param part Partition ID.
-     */
-    CacheContinuousQueryLostPartition(UUID routineId, int cacheId, int part) {
-        this.routineId = routineId;
-        this.cacheId = cacheId;
-        this.part = part;
-    }
-
-    /**
-     * @return Partition.
-     */
-    int partition() {
-        return part;
-    }
-
-    /**
-     * @return Routine ID.
-     */
-    UUID routineId() {
-        return routineId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte directType() {
-        return 115;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!super.writeTo(buf, writer))
-            return false;
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType(), fieldsCount()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 3:
-                if (!writer.writeInt("part", part))
-                    return false;
-
-                writer.incrementState();
-
-            case 4:
-                if (!writer.writeUuid("routineId", routineId))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!reader.beforeMessageRead())
-            return false;
-
-        if (!super.readFrom(buf, reader))
-            return false;
-
-        switch (reader.state()) {
-            case 3:
-                part = reader.readInt("part");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 4:
-                routineId = reader.readUuid("routineId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        return 5;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(CacheContinuousQueryLostPartition.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2b3ee721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index bc68b58..16b40c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -46,9 +46,6 @@ import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.events.CacheRebalancingEvent;
-import org.apache.ignite.events.Event;
-import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -90,6 +87,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     /** */
     private static final long BACKUP_ACK_FREQ = 5000;
 
+    public static final boolean SUPER_DEBUG = false;
+
     /** Listeners. */
     private final ConcurrentMap<UUID, CacheContinuousQueryListener> lsnrs = new ConcurrentHashMap8<>();
 
@@ -127,16 +126,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                 }
             });
 
-        cctx.io().addHandler(cctx.cacheId(), CacheContinuousQueryLostPartition.class,
-            new CI2<UUID, CacheContinuousQueryLostPartition>() {
-                @Override public void apply(UUID uuid, CacheContinuousQueryLostPartition msg) {
-                    CacheContinuousQueryListener lsnr = lsnrs.get(msg.routineId());
-
-                    if (lsnr != null)
-                        lsnr.partitionLost(msg.partition());
-                }
-            });
-
         cctx.time().schedule(new Runnable() {
             @Override public void run() {
                 for (CacheContinuousQueryListener lsnr : lsnrs.values())
@@ -146,20 +135,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                     lsnr.acknowledgeBackupOnTimeout(cctx.kernalContext());
             }
         }, BACKUP_ACK_FREQ, BACKUP_ACK_FREQ);
-
-        cctx.kernalContext().event().addLocalEventListener(new GridLocalEventListener() {
-            @Override public void onEvent(Event evt) {
-                assert evt instanceof CacheRebalancingEvent;
-
-                CacheRebalancingEvent evt0 = (CacheRebalancingEvent)evt;
-
-                for (CacheContinuousQueryListener lsnr : lsnrs.values())
-                    lsnr.firePartitionLostEvent(evt0.cacheName(), evt0.partition());
-
-                for (CacheContinuousQueryListener lsnr : intLsnrs.values())
-                    lsnr.firePartitionLostEvent(evt0.cacheName(), evt0.partition());
-            }
-        }, EVT_CACHE_REBALANCE_PART_DATA_LOST);
     }
 
     /** {@inheritDoc} */
@@ -319,7 +294,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                    null,
                    lsnr.oldValueRequired() ? oldVal : null,
                    e.partition(),
-                   0,
+                   -1,
                    null);
 
                 CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent(
@@ -566,7 +541,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                                     e.rawGet(),
                                     null,
                                     0,
-                                    0,
+                                    -1,
                                     null);
 
                                 next = new CacheContinuousQueryEvent<>(

http://git-wip-us.apache.org/repos/asf/ignite/blob/2b3ee721/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
index 975cd2f..40fb12a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
@@ -114,12 +114,6 @@ public interface GridContinuousHandler extends Externalizable, Cloneable {
     public void onBatchAcknowledged(UUID routineId, GridContinuousBatch batch, GridKernalContext ctx);
 
     /**
-     * @param cacheName Cache name.
-     * @param partId Partition ID.
-     */
-    public void partitionLost(String cacheName, int partId);
-
-    /**
      * @return Topic for ordered notifications. If {@code null}, notifications
      * will be sent in non-ordered messages.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/2b3ee721/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
index 61fa6cd..ca754af 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -31,7 +32,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -39,6 +39,8 @@ import javax.cache.Cache;
 import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryListenerException;
 import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
@@ -47,6 +49,7 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheEntryProcessor;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.query.ContinuousQuery;
@@ -71,10 +74,10 @@ import org.apache.ignite.internal.util.typedef.PAX;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.T3;
 import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.resources.LoggerResource;
 import org.apache.ignite.spi.IgniteSpiException;
-import org.apache.ignite.spi.communication.CommunicationSpi;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -279,6 +282,17 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
         IgniteCache<Object, Object> clnCache = qryClient.cache(null);
 
+        IgniteOutClosure<IgniteCache<Integer, Integer>> rndCache =
+            new IgniteOutClosure<IgniteCache<Integer, Integer>>() {
+                int cnt = 0;
+
+                @Override public IgniteCache<Integer, Integer> apply() {
+                    ++cnt;
+
+                    return grid(cnt % SRV_NODES + 1).cache(null);
+                }
+            };
+
         Ignite igniteSrv = ignite(0);
 
         IgniteCache<Object, Object> srvCache = igniteSrv.cache(null);
@@ -290,16 +304,18 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
         for (int j = 0; j < 50; ++j) {
             ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
 
-            final TestLocalListener lsnr = new TestLocalListener();
+            final CacheEventListener3 lsnr = new CacheEventListener3();
 
             qry.setLocalListener(lsnr);
 
+            qry.setRemoteFilter(lsnr);
+
             int keyIter = 0;
 
             for (; keyIter < keyCnt / 2; keyIter++) {
                 int key = keys.get(keyIter);
 
-                clnCache.put(key, key);
+                rndCache.apply().put(key, key);
             }
 
             assert lsnr.evts.isEmpty();
@@ -312,28 +328,40 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
             Affinity<Object> aff = affinity(srvCache);
 
+            boolean filtered = false;
+
             for (; keyIter < keys.size(); keyIter++) {
                 int key = keys.get(keyIter);
 
-                log.info("Put [key=" + key + ", part=" + aff.partition(key) + ']');
+                int val = filtered ? 1 : 2;
+
+                log.info("Put [key=" + key + ", val=" + val + ", part=" + aff.partition(key) + ']');
 
                 T2<Object, Object> t = updates.get(key);
 
                 if (t == null) {
-                    updates.put(key, new T2<>((Object)key, null));
+                    // Check filtered.
+                    if (!filtered) {
+                        updates.put(key, new T2<>((Object)val, null));
 
-                    expEvts.add(new T3<>((Object)key, (Object)key, null));
+                        expEvts.add(new T3<>((Object)key, (Object)val, null));
+                    }
                 }
                 else {
-                    updates.put(key, new T2<>((Object)key, (Object)key));
+                    // Check filtered.
+                    if (!filtered) {
+                        updates.put(key, new T2<>((Object)val, (Object)t.get1()));
 
-                    expEvts.add(new T3<>((Object)key, (Object)key, (Object)key));
+                        expEvts.add(new T3<>((Object)key, (Object)val, (Object)t.get1()));
+                    }
                 }
 
-                srvCache.put(key, key);
+                rndCache.apply().put(key, val);
+
+                filtered = !filtered;
             }
 
-            checkEvents(expEvts, lsnr);
+            checkEvents(expEvts, lsnr, false);
 
             query.close();
         }
@@ -357,7 +385,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
         ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
 
-        final TestLocalListener lsnr = new TestLocalListener();
+        final CacheEventListener3 lsnr = new CacheEventListener3();
 
         qry.setLocalListener(lsnr);
 
@@ -421,7 +449,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
             filtered = !filtered;
         }
 
-        checkEvents(expEvts, lsnr);
+        checkEvents(expEvts, lsnr, false);
 
         List<Thread> stopThreads = new ArrayList<>(3);
 
@@ -488,7 +516,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
             filtered = !filtered;
         }
 
-        checkEvents(expEvts, lsnr);
+        checkEvents(expEvts, lsnr, false);
 
         query.close();
     }
@@ -518,7 +546,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
         ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
 
-        final TestLocalListener lsnr = new TestLocalListener();
+        final CacheEventListener3 lsnr = new CacheEventListener3();
 
         qry.setLocalListener(lsnr);
 
@@ -599,43 +627,13 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
                 fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + keys0.size() + ']');
             }
 
-            checkEvents(expEvts, lsnr);
+            checkEvents(expEvts, lsnr, false);
         }
 
         cur.close();
     }
 
     /**
-     *
-     */
-    public static class TestLocalListener implements CacheEntryUpdatedListener<Object, Object>,
-        CacheEntryEventSerializableFilter<Object, Object> {
-        /** Keys. */
-        GridConcurrentHashSet<Integer> keys = new GridConcurrentHashSet<>();
-
-        /** Events. */
-        private final ConcurrentHashMap<Object, CacheEntryEvent<?, ?>> evts = new ConcurrentHashMap<>();
-
-        /** {@inheritDoc} */
-        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> events) throws CacheEntryListenerException {
-            for (CacheEntryEvent<?, ?> e : events) {
-                System.err.println("Update entry: " + e);
-
-                Integer key = (Integer)e.getKey();
-
-                keys.add(key);
-
-                evts.put(key, e);
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean evaluate(CacheEntryEvent<?, ?> e) throws CacheEntryListenerException {
-            return (Integer)e.getValue() % 2 == 0;
-        }
-    }
-
-    /**
      * @throws Exception If failed.
      */
     public void testThreeBackups() throws Exception {
@@ -822,22 +820,145 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
     /**
      * @param expEvts Expected events.
      * @param lsnr Listener.
+     * @param lostAllow If {@code true} than won't assert on lost events.
      */
-    private void checkEvents(final List<T3<Object, Object, Object>> expEvts, final TestLocalListener lsnr)
-        throws Exception {
-        assert GridTestUtils.waitForCondition(new PA() {
+    private void checkEvents(final List<T3<Object, Object, Object>> expEvts, final CacheEventListener2 lsnr,
+        boolean lostAllow) throws Exception {
+        boolean b = GridTestUtils.waitForCondition(new PA() {
             @Override public boolean apply() {
-                return lsnr.evts.size() == expEvts.size();
+                return expEvts.size() == lsnr.size();
             }
         }, 2000L);
 
+        List<T3<Object, Object, Object>> lostEvents = new ArrayList<>();
+
+        for (T3<Object, Object, Object> exp : expEvts) {
+            List<CacheEntryEvent<?, ?>> rcvdEvts = lsnr.evts.get(exp.get1());
+
+            if (rcvdEvts == null || rcvdEvts.isEmpty()) {
+                lostEvents.add(exp);
+
+                continue;
+            }
+
+            Iterator<CacheEntryEvent<?, ?>> iter = rcvdEvts.iterator();
+
+            boolean found = false;
+
+            while (iter.hasNext()) {
+                CacheEntryEvent<?, ?> e = iter.next();
+
+                if ((exp.get2() != null && e.getValue() != null && exp.get2() == e.getValue())
+                    && equalOldValue(e, exp)) {
+                    found = true;
+
+                    iter.remove();
+
+                    break;
+                }
+            }
+
+            // Lost event is acceptable.
+            if (!found)
+                lostEvents.add(exp);
+        }
+
+        boolean dup = false;
+
+        // Check duplicate.
+        if (!lsnr.evts.isEmpty()) {
+            for (List<CacheEntryEvent<?, ?>> evts : lsnr.evts.values()) {
+                if (!evts.isEmpty()) {
+                    for (CacheEntryEvent<?, ?> e : evts) {
+                        boolean found = false;
+
+                        for (T3<Object, Object, Object> lostEvt : lostEvents) {
+                            if (e.getKey().equals(lostEvt.get1()) && e.getValue().equals(lostEvt.get2())
+                                && equalOldValue(e, lostEvt)) {
+                                found = true;
+
+                                lostEvents.remove(lostEvt);
+
+                                break;
+                            }
+                        }
+
+                        if (!found) {
+                            dup = true;
+
+                            break;
+                        }
+                    }
+                }
+            }
+
+            if (dup) {
+                for (T3<Object, Object, Object> e : lostEvents)
+                    log.error("Lost event: " + e);
+
+                for (List<CacheEntryEvent<?, ?>> e : lsnr.evts.values())
+                    if (!e.isEmpty())
+                        log.error("Duplicate event: " + e);
+            }
+
+            assertFalse("Received duplicate events, see log for details.", dup);
+        }
+
+        if (!lostAllow && !lostEvents.isEmpty()) {
+            log.error("Lost event cnt: " + lostEvents.size());
+
+            for (T3<Object, Object, Object> e : lostEvents)
+                log.error("Lost event: " + e);
+
+            assertTrue("Lose events, see log for details.", false);
+        }
+
+        log.error("Lost event cnt: " + lostEvents.size());
+
+        expEvts.clear();
+
+        lsnr.evts.clear();
+    }
+
+    /**
+     * @param e Event
+     * @param expVals expected value
+     * @return {@code True} if entries has the same key, value and oldValue. If cache start without backups
+     *          than oldValue ignoring in comparison.
+     */
+    private boolean equalOldValue(CacheEntryEvent<?, ?> e, T3<Object, Object, Object> expVals) {
+        return (e.getOldValue() == null && expVals.get3() == null) // Both null
+            || (e.getOldValue() != null && expVals.get3() != null  // Equals
+                && e.getOldValue().equals(expVals.get3()))
+            || (backups == 0); // If we start without backup than oldValue might be lose.
+    }
+
+    /**
+     * @param expEvts Expected events.
+     * @param lsnr Listener.
+     */
+    private void checkEvents(final List<T3<Object, Object, Object>> expEvts, final CacheEventListener3 lsnr,
+        boolean allowLoseEvent) throws Exception {
+        if (!allowLoseEvent)
+            assert GridTestUtils.waitForCondition(new PA() {
+                @Override public boolean apply() {
+                    return lsnr.evts.size() == expEvts.size();
+                }
+            }, 2000L);
+
         for (T3<Object, Object, Object> exp : expEvts) {
             CacheEntryEvent<?, ?> e = lsnr.evts.get(exp.get1());
 
             assertNotNull("No event for key: " + exp.get1(), e);
             assertEquals("Unexpected value: " + e, exp.get2(), e.getValue());
+
+            if (allowLoseEvent)
+                lsnr.evts.remove(exp.get1());
         }
 
+        if (allowLoseEvent)
+            assert lsnr.evts.isEmpty();
+
         expEvts.clear();
 
         lsnr.evts.clear();
@@ -1058,6 +1179,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
         final AtomicReference<CountDownLatch> checkLatch = new AtomicReference<>();
 
+        boolean processorPut = false;
+
         IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new Callable<Void>() {
             @Override public Void call() throws Exception {
                 final int idx = SRV_NODES + 1;
@@ -1093,7 +1216,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
         final Map<Integer, List<T2<Integer, Integer>>> expEvts = new HashMap<>();
 
         try {
-            long stopTime = System.currentTimeMillis() + 3 * 60_000;
+            long stopTime = System.currentTimeMillis() + 1 * 60_000;
 
             final int PARTS = qryClient.affinity(null).partitions();
 
@@ -1110,7 +1233,20 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
                 else
                     val = val + 1;
 
-                qryClientCache.put(key, val);
+                if (processorPut && prevVal != null) {
+                    qryClientCache.invoke(key, new CacheEntryProcessor<Object, Object, Void>() {
+                        @Override public Void process(MutableEntry<Object, Object> entry,
+                            Object... arguments) throws EntryProcessorException {
+                            entry.setValue(arguments[0]);
+
+                            return null;
+                        }
+                    }, val);
+                }
+                else
+                    qryClientCache.put(key, val);
+
+                processorPut = !processorPut;
 
                 vals.put(key, val);
 
@@ -1187,8 +1323,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
     /**
      * @throws Exception If failed.
      */
-    public void testMultiThreaded() throws Exception {
-        final int SRV_NODES = 3;
+    public void testFailoverFilter() throws Exception {
+        final int SRV_NODES = 4;
 
         startGridsMultiThreaded(SRV_NODES);
 
@@ -1196,142 +1332,633 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
         Ignite qryClient = startGrid(SRV_NODES);
 
-        final IgniteCache<Object, Object> cache = qryClient.cache(null);
+        client = false;
 
-        CacheEventListener1 lsnr = new CacheEventListener1(true);
+        IgniteCache<Object, Object> qryClientCache = qryClient.cache(null);
+
+        final CacheEventListener2 lsnr = new CacheEventListener2();
 
         ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
 
         qry.setLocalListener(lsnr);
 
-        QueryCursor<?> cur = cache.query(qry);
-
-        client = false;
-
-        final int SRV_IDX = SRV_NODES - 1;
+        qry.setRemoteFilter(new CacheEventFilter());
 
-        List<Integer> keys = primaryKeys(ignite(SRV_IDX).cache(null), 10);
+        QueryCursor<?> cur = qryClientCache.query(qry);
 
-        final int THREADS = 10;
+        final AtomicBoolean stop = new AtomicBoolean();
 
-        for (int i = 0; i < keys.size(); i++) {
-            log.info("Iteration: " + i);
+        final AtomicReference<CountDownLatch> checkLatch = new AtomicReference<>();
 
-            Ignite srv = ignite(SRV_IDX);
+        IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                final int idx = SRV_NODES + 1;
 
-            TestCommunicationSpi spi = (TestCommunicationSpi)srv.configuration().getCommunicationSpi();
+                while (!stop.get() && !err) {
+                    log.info("Start node: " + idx);
 
-            spi.sndFirstOnly = new AtomicBoolean(false);
+                    startGrid(idx);
 
-            final Integer key = keys.get(i);
+                    Thread.sleep(3000);
 
-            final AtomicInteger val = new AtomicInteger();
+                    log.info("Stop node: " + idx);
 
-            CountDownLatch latch = new CountDownLatch(THREADS);
+                    stopGrid(idx);
 
-            lsnr.latch = latch;
+                    CountDownLatch latch = new CountDownLatch(1);
 
-            IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    Integer val0 = val.getAndIncrement();
+                    assertTrue(checkLatch.compareAndSet(null, latch));
 
-                    cache.put(key, val0);
+                    if (!stop.get()) {
+                        log.info("Wait for event check.");
 
-                    return null;
+                        assertTrue(latch.await(1, MINUTES));
+                    }
                 }
-            }, THREADS, "update-thread");
 
-            fut.get();
+                return null;
+            }
+        });
 
-            stopGrid(SRV_IDX);
+        final Map<Integer, Integer> vals = new HashMap<>();
 
-            if (!latch.await(5, SECONDS))
-                fail("Failed to wait for notifications [exp=" + THREADS + ", left=" + lsnr.latch.getCount() + ']');
+        final Map<Integer, List<T2<Integer, Integer>>> expEvts = new HashMap<>();
 
-            assertEquals(THREADS, lsnr.allEvts.size());
+        try {
+            long stopTime = System.currentTimeMillis() + 1 * 60_000;
 
-            Set<Integer> vals = new HashSet<>();
+            final int PARTS = qryClient.affinity(null).partitions();
 
-            boolean err = false;
+            ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
-            for (CacheEntryEvent<?, ?> evt : lsnr.allEvts) {
-                assertEquals(key, evt.getKey());
-                assertNotNull(evt.getValue());
+            boolean filtered = false;
 
-                if (!vals.add((Integer)evt.getValue())) {
-                    err = true;
+            boolean processorPut = false;
 
-                    log.info("Extra event: " + evt);
-                }
-            }
+            while (System.currentTimeMillis() < stopTime) {
+                Integer key = rnd.nextInt(PARTS);
 
-            for (int v = 0; v < THREADS; v++) {
-                if (!vals.contains(v)) {
-                    err = true;
+                Integer prevVal = vals.get(key);
+                Integer val = vals.get(key);
 
-                    log.info("Event for value not received: " + v);
-                }
-            }
+                if (val == null)
+                    val = 0;
+                else
+                    val = Math.abs(val) + 1;
 
-            assertFalse("Invalid events, see log for details.", err);
+                if (filtered)
+                    val = -val;
 
-            lsnr.allEvts.clear();
+                if (processorPut && prevVal != null) {
+                    qryClientCache.invoke(key, new CacheEntryProcessor<Object, Object, Void>() {
+                        @Override public Void process(MutableEntry<Object, Object> entry,
+                            Object... arguments) throws EntryProcessorException {
+                            entry.setValue(arguments[0]);
 
-            startGrid(SRV_IDX);
-        }
+                            return null;
+                        }
+                    }, val);
+                }
+                else
+                    qryClientCache.put(key, val);
 
-        cur.close();
-    }
+                processorPut = !processorPut;
 
-    /**
-     * @param logAll If {@code true} logs all unexpected values.
-     * @param expEvts Expected values.
-     * @param lsnr Listener.
-     * @return Check status.
-     */
-    @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
-    private boolean checkEvents(boolean logAll,
-        Map<Integer, List<T2<Integer, Integer>>> expEvts,
-        CacheEventListener2 lsnr) {
-        assertTrue(!expEvts.isEmpty());
+                vals.put(key, val);
 
-        boolean pass = true;
+                if (val >= 0) {
+                    List<T2<Integer, Integer>> keyEvts = expEvts.get(key);
 
-        for (Map.Entry<Integer, List<T2<Integer, Integer>>> e : expEvts.entrySet()) {
-            Integer key = e.getKey();
-            List<T2<Integer, Integer>> exp = e.getValue();
+                    if (keyEvts == null) {
+                        keyEvts = new ArrayList<>();
 
-            List<CacheEntryEvent<?, ?>> rcvdEvts = lsnr.evts.get(key);
+                        expEvts.put(key, keyEvts);
+                    }
 
-            if (rcvdEvts == null) {
-                pass = false;
+                    keyEvts.add(new T2<>(val, prevVal));
+                }
 
-                log.info("No events for key [key=" + key + ", exp=" + e.getValue() + ']');
+                filtered = !filtered;
 
-                if (!logAll)
-                    return false;
-            }
-            else {
-                synchronized (rcvdEvts) {
-                    if (rcvdEvts.size() != exp.size()) {
-                        pass = false;
+                CountDownLatch latch = checkLatch.get();
 
-                        log.info("Missed or extra events for key [key=" + key +
-                            ", exp=" + e.getValue() +
-                            ", rcvd=" + rcvdEvts + ']');
+                if (latch != null) {
+                    log.info("Check events.");
 
-                        if (!logAll)
-                            return false;
-                    }
+                    checkLatch.set(null);
 
-                    int cnt = Math.min(rcvdEvts.size(), exp.size());
+                    boolean success = false;
 
-                    for (int i = 0; i < cnt; i++) {
-                        T2<Integer, Integer> expEvt = exp.get(i);
-                        CacheEntryEvent<?, ?> rcvdEvt = rcvdEvts.get(i);
+                    try {
+                        if (err)
+                            break;
 
-                        assertEquals(key, rcvdEvt.getKey());
-                        assertEquals(expEvt.get1(), rcvdEvt.getValue());
+                        boolean check = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                            @Override public boolean apply() {
+                                return checkEvents(false, expEvts, lsnr);
+                            }
+                        }, 10_000);
+
+                        if (!check)
+                            assertTrue(checkEvents(true, expEvts, lsnr));
+
+                        success = true;
+
+                        log.info("Events checked.");
+                    }
+                    finally {
+                        if (!success)
+                            err = true;
+
+                        latch.countDown();
+                    }
+                }
+            }
+        }
+        finally {
+            stop.set(true);
+        }
+
+        CountDownLatch latch = checkLatch.get();
+
+        if (latch != null)
+            latch.countDown();
+
+        restartFut.get();
+
+        boolean check = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return checkEvents(false, expEvts, lsnr);
+            }
+        }, 10_000);
+
+        if (!check)
+            assertTrue(checkEvents(true, expEvts, lsnr));
+
+        cur.close();
+
+        assertFalse("Unexpected error during test, see log for details.", err);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testFailoverStartStopWithoutBackup() throws Exception {
+        failoverStartStopFilter(0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testFailoverStartStopOneBackup() throws Exception {
+        failoverStartStopFilter(1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void _testStartStop() throws Exception {
+        this.backups = 0;
+
+        final int SRV_NODES = 4;
+
+        startGridsMultiThreaded(SRV_NODES);
+
+        client = true;
+
+        Ignite qryClient = startGrid(SRV_NODES);
+
+        client = false;
+
+        IgniteCache<Object, Object> qryClnCache = qryClient.cache(null);
+
+        final CacheEventListener2 lsnr = new CacheEventListener2();
+
+        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+        qry.setLocalListener(lsnr);
+
+        qry.setRemoteFilter(new CacheEventFilter());
+
+        QueryCursor<?> cur = qryClnCache.query(qry);
+
+        for (int i = 0; i < 100; i++) {
+            final int idx = i % (SRV_NODES - 1);
+
+            log.info("Stop node: " + idx);
+
+            stopGrid(idx);
+
+            Thread.sleep(200);
+
+            List<T3<Object, Object, Object>> afterRestEvents = new ArrayList<>();
+
+            for (int j = 0; j < 10; j++) {
+                Integer oldVal = (Integer)qryClnCache.get(j);
+
+                qryClnCache.put(j, i);
+
+                afterRestEvents.add(new T3<>((Object)j, (Object)i, (Object)oldVal));
+            }
+
+            checkEvents(new ArrayList<>(afterRestEvents), lsnr, false);
+
+            log.info("Start node: " + idx);
+
+            startGrid(idx);
+        }
+
+        cur.close();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void failoverStartStopFilter(int backups) throws Exception {
+        this.backups = backups;
+
+        final int SRV_NODES = 4;
+
+        startGridsMultiThreaded(SRV_NODES);
+
+        client = true;
+
+        Ignite qryClient = startGrid(SRV_NODES);
+
+        client = false;
+
+        IgniteCache<Object, Object> qryClnCache = qryClient.cache(null);
+
+        final CacheEventListener2 lsnr = new CacheEventListener2();
+
+        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+        qry.setLocalListener(lsnr);
+
+        qry.setRemoteFilter(new CacheEventFilter());
+
+        QueryCursor<?> cur = qryClnCache.query(qry);
+
+        CacheEventListener2 dinLsnr = null;
+
+        QueryCursor<?> dinQry = null;
+
+        final AtomicBoolean stop = new AtomicBoolean();
+
+        final AtomicReference<CountDownLatch> checkLatch = new AtomicReference<>();
+
+        IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                while (!stop.get() && !err) {
+                    final int idx = ThreadLocalRandom.current().nextInt(SRV_NODES - 1);
+
+                    log.info("Stop node: " + idx);
+
+                    stopGrid(idx);
+
+                    Thread.sleep(100);
+
+                    log.info("Start node: " + idx);
+
+                    startGrid(idx);
+
+                    CountDownLatch latch = new CountDownLatch(1);
+
+                    assertTrue(checkLatch.compareAndSet(null, latch));
+
+                    if (!stop.get()) {
+                        log.info("Wait for event check.");
+
+                        assertTrue(latch.await(1, MINUTES));
+                    }
+                }
+
+                return null;
+            }
+        });
+
+        final Map<Integer, Integer> vals = new HashMap<>();
+
+        final Map<Integer, List<T2<Integer, Integer>>> expEvts = new HashMap<>();
+
+        final List<T3<Object, Object, Object>> expEvtsNewLsnr = new ArrayList<>();
+
+        final List<T3<Object, Object, Object>> expEvtsLsnr = new ArrayList<>();
+
+        try {
+            long stopTime = System.currentTimeMillis() + 60_000;
+
+            // Start new filter each 5 sec.
+            long startFilterTime = System.currentTimeMillis() + 5_000;
+
+            final int PARTS = qryClient.affinity(null).partitions();
+
+            ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+            boolean filtered = false;
+
+            boolean processorPut = false;
+
+            while (System.currentTimeMillis() < stopTime) {
+                Integer key = rnd.nextInt(PARTS);
+
+                Integer prevVal = vals.get(key);
+                Integer val = vals.get(key);
+
+                if (System.currentTimeMillis() > startFilterTime) {
+                    // Stop filter and check events.
+                    if (dinQry != null) {
+                        dinQry.close();
+
+                        log.error("Continuous query listener closed.");
+
+                        checkEvents(expEvtsNewLsnr, dinLsnr, backups == 0);
+                    }
+
+                    dinLsnr = new CacheEventListener2();
+
+                    ContinuousQuery<Object, Object> newQry = new ContinuousQuery<>();
+
+                    newQry.setLocalListener(dinLsnr);
+
+                    newQry.setRemoteFilter(new CacheEventFilter());
+
+                    dinQry = qryClnCache.query(newQry);
+
+                    log.error("Continuous query listener started.");
+
+                    startFilterTime = System.currentTimeMillis() + 5_000;
+                }
+
+                if (val == null)
+                    val = 0;
+                else
+                    val = Math.abs(val) + 1;
+
+                if (filtered)
+                    val = -val;
+
+                if (processorPut && prevVal != null) {
+                    qryClnCache.invoke(key, new CacheEntryProcessor<Object, Object, Void>() {
+                        @Override public Void process(MutableEntry<Object, Object> entry,
+                            Object... arguments) throws EntryProcessorException {
+                            entry.setValue(arguments[0]);
+
+                            return null;
+                        }
+                    }, val);
+                }
+                else
+                    qryClnCache.put(key, val);
+
+                processorPut = !processorPut;
+
+                vals.put(key, val);
+
+                if (val >= 0) {
+                    List<T2<Integer, Integer>> keyEvts = expEvts.get(key);
+
+                    if (keyEvts == null) {
+                        keyEvts = new ArrayList<>();
+
+                        expEvts.put(key, keyEvts);
+                    }
+
+                    keyEvts.add(new T2<>(val, prevVal));
+
+                    T3<Object, Object, Object> tupVal = new T3<>((Object)key, (Object)val, (Object)prevVal);
+
+                    expEvtsLsnr.add(tupVal);
+
+                    if (dinQry != null)
+                        expEvtsNewLsnr.add(tupVal);
+                }
+
+                filtered = !filtered;
+
+                CountDownLatch latch = checkLatch.get();
+
+                if (latch != null) {
+                    log.info("Check events.");
+
+                    checkLatch.set(null);
+
+                    boolean success = false;
+
+                    try {
+                        if (err)
+                            break;
+
+                        checkEvents(expEvtsLsnr, lsnr, backups == 0);
+
+                        success = true;
+
+                        log.info("Events checked.");
+                    }
+                    finally {
+                        if (!success)
+                            err = true;
+
+                        latch.countDown();
+                    }
+                }
+            }
+        }
+        finally {
+            stop.set(true);
+        }
+
+        CountDownLatch latch = checkLatch.get();
+
+        if (latch != null)
+            latch.countDown();
+
+        restartFut.get();
+
+        checkEvents(expEvtsLsnr, lsnr, backups == 0);
+
+        lsnr.evts.clear();
+        lsnr.vals.clear();
+
+        if (dinQry != null) {
+            checkEvents(expEvtsNewLsnr, dinLsnr, backups == 0);
+
+            dinLsnr.evts.clear();
+            dinLsnr.vals.clear();
+
+            dinQry.close();
+        }
+
+        List<T3<Object, Object, Object>> afterRestEvents = new ArrayList<>();
+
+        for (int i = 0; i < 1024; i++) {
+            Integer oldVal = (Integer)qryClnCache.get(i);
+
+            qryClnCache.put(i, i);
+
+            afterRestEvents.add(new T3<>((Object)i, (Object)i, (Object)oldVal));
+        }
+
+        checkEvents(new ArrayList<>(afterRestEvents), lsnr, false);
+
+        //checkEvents(new ArrayList<>(afterRestEvents), dinLsnr, false);
+
+        cur.close();
+
+        if (dinQry != null)
+            dinQry.close();
+
+        assertFalse("Unexpected error during test, see log for details.", err);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultiThreaded() throws Exception {
+        final int SRV_NODES = 3;
+
+        startGridsMultiThreaded(SRV_NODES);
+
+        client = true;
+
+        Ignite qryClient = startGrid(SRV_NODES);
+
+        final IgniteCache<Object, Object> cache = qryClient.cache(null);
+
+        CacheEventListener1 lsnr = new CacheEventListener1(true);
+
+        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+        qry.setLocalListener(lsnr);
+
+        QueryCursor<?> cur = cache.query(qry);
+
+        client = false;
+
+        final int SRV_IDX = SRV_NODES - 1;
+
+        List<Integer> keys = primaryKeys(ignite(SRV_IDX).cache(null), 10);
+
+        final int THREADS = 10;
+
+        for (int i = 0; i < keys.size(); i++) {
+            log.info("Iteration: " + i);
+
+            Ignite srv = ignite(SRV_IDX);
+
+            TestCommunicationSpi spi = (TestCommunicationSpi)srv.configuration().getCommunicationSpi();
+
+            spi.sndFirstOnly = new AtomicBoolean(false);
+
+            final Integer key = keys.get(i);
+
+            final AtomicInteger val = new AtomicInteger();
+
+            CountDownLatch latch = new CountDownLatch(THREADS);
+
+            lsnr.latch = latch;
+
+            IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    Integer val0 = val.getAndIncrement();
+
+                    cache.put(key, val0);
+
+                    return null;
+                }
+            }, THREADS, "update-thread");
+
+            fut.get();
+
+            stopGrid(SRV_IDX);
+
+            if (!latch.await(5, SECONDS))
+                fail("Failed to wait for notifications [exp=" + THREADS + ", left=" + lsnr.latch.getCount() + ']');
+
+            assertEquals(THREADS, lsnr.allEvts.size());
+
+            Set<Integer> vals = new HashSet<>();
+
+            boolean err = false;
+
+            for (CacheEntryEvent<?, ?> evt : lsnr.allEvts) {
+                assertEquals(key, evt.getKey());
+                assertNotNull(evt.getValue());
+
+                if (!vals.add((Integer)evt.getValue())) {
+                    err = true;
+
+                    log.info("Extra event: " + evt);
+                }
+            }
+
+            for (int v = 0; v < THREADS; v++) {
+                if (!vals.contains(v)) {
+                    err = true;
+
+                    log.info("Event for value not received: " + v);
+                }
+            }
+
+            assertFalse("Invalid events, see log for details.", err);
+
+            lsnr.allEvts.clear();
+
+            startGrid(SRV_IDX);
+        }
+
+        cur.close();
+    }
+
+    /**
+     * @param logAll If {@code true} logs all unexpected values.
+     * @param expEvts Expected values.
+     * @param lsnr Listener.
+     * @return Check status.
+     */
+    @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+    private boolean checkEvents(boolean logAll,
+        Map<Integer, List<T2<Integer, Integer>>> expEvts,
+        CacheEventListener2 lsnr) {
+        assertTrue(!expEvts.isEmpty());
+
+        boolean pass = true;
+
+        for (Map.Entry<Integer, List<T2<Integer, Integer>>> e : expEvts.entrySet()) {
+            Integer key = e.getKey();
+            List<T2<Integer, Integer>> exp = e.getValue();
+
+            List<CacheEntryEvent<?, ?>> rcvdEvts = lsnr.evts.get(key);
+
+            if (rcvdEvts == null) {
+                pass = false;
+
+                log.info("No events for key [key=" + key + ", exp=" + e.getValue() + ']');
+
+                if (!logAll)
+                    return false;
+            }
+            else {
+                synchronized (rcvdEvts) {
+                    if (rcvdEvts.size() != exp.size()) {
+                        pass = false;
+
+                        log.info("Missed or extra events for key [key=" + key +
+                            ", exp=" + e.getValue() +
+                            ", rcvd=" + rcvdEvts + ']');
+
+                        if (!logAll)
+                            return false;
+                    }
+
+                    int cnt = Math.min(rcvdEvts.size(), exp.size());
+
+                    for (int i = 0; i < cnt; i++) {
+                        T2<Integer, Integer> expEvt = exp.get(i);
+                        CacheEntryEvent<?, ?> rcvdEvt = rcvdEvts.get(i);
+
+                        assertEquals(key, rcvdEvt.getKey());
+                        assertEquals(expEvt.get1(), rcvdEvt.getValue());
                     }
                 }
             }
@@ -1384,7 +2011,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
                     this.evts.put(evt.getKey(), evt);
 
-                    keys.add((Integer) evt.getKey());
+                    keys.add((Integer)evt.getKey());
 
                     if (allEvts != null)
                         allEvts.add(evt);
@@ -1423,6 +2050,18 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
         /** */
         private final ConcurrentHashMap<Integer, List<CacheEntryEvent<?, ?>>> evts = new ConcurrentHashMap<>();
 
+        /**
+         * @return Count events.
+         */
+        public int size() {
+            int size = 0;
+
+            for (List<CacheEntryEvent<?, ?>> e : evts.values())
+                size += e.size();
+
+            return size;
+        }
+
         /** {@inheritDoc} */
         @Override public synchronized void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts)
             throws CacheEntryListenerException  {
@@ -1467,6 +2106,44 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
     /**
      *
      */
+    public static class CacheEventListener3 implements CacheEntryUpdatedListener<Object, Object>,
+        CacheEntryEventSerializableFilter<Object, Object> {
+        /** Keys. */
+        GridConcurrentHashSet<Integer> keys = new GridConcurrentHashSet<>();
+
+        /** Events. */
+        private final ConcurrentHashMap<Object, CacheEntryEvent<?, ?>> evts = new ConcurrentHashMap<>();
+
+        /** {@inheritDoc} */
+        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> events) throws CacheEntryListenerException {
+            for (CacheEntryEvent<?, ?> e : events) {
+                Integer key = (Integer)e.getKey();
+
+                keys.add(key);
+
+                assert evts.put(key, e) == null;
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<?, ?> e) throws CacheEntryListenerException {
+            return (Integer)e.getValue() % 2 == 0;
+        }
+    }
+
+    /**
+     *
+     */
+    public static class CacheEventFilter implements CacheEntryEventSerializableFilter<Object, Object> {
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<?, ?> event) throws CacheEntryListenerException {
+            return ((Integer)event.getValue()) >= 0;
+        }
+    }
+
+    /**
+     *
+     */
     private static class TestCommunicationSpi extends TcpCommunicationSpi {
         /** */
         @LoggerResource


Mime
View raw message