ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject [16/36] ignite git commit: IGNITE-426 Implemented review notes.
Date Wed, 04 Nov 2015 14:10:59 GMT
IGNITE-426 Implemented review notes.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0a2fecb1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0a2fecb1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0a2fecb1

Branch: refs/heads/ignite-462-2
Commit: 0a2fecb1977f653c5d0093682544f1f176fb3b07
Parents: ed3d86e
Author: nikolay_tikhonov <ntikhonov@gridgain.com>
Authored: Wed Oct 28 15:07:31 2015 +0300
Committer: nikolay_tikhonov <ntikhonov@gridgain.com>
Committed: Wed Nov 4 17:02:42 2015 +0300

----------------------------------------------------------------------
 .../internal/GridEventConsumeHandler.java       |   6 +
 .../internal/GridMessageListenHandler.java      |   6 +
 .../processors/cache/GridCacheMapEntry.java     |  13 +-
 .../cache/GridCacheUpdateAtomicResult.java      |   1 -
 .../dht/GridDhtPartitionTopologyImpl.java       |   3 +-
 .../distributed/dht/GridDhtTxFinishRequest.java |   4 +-
 .../dht/atomic/GridDhtAtomicCache.java          |   6 +-
 .../distributed/near/GridNearAtomicCache.java   |   2 +-
 .../CacheContinuousQueryBatchAck.java           |  11 +-
 .../continuous/CacheContinuousQueryEntry.java   |  67 +++++-
 .../continuous/CacheContinuousQueryHandler.java | 220 ++++++++++++++++---
 .../continuous/CacheContinuousQueryManager.java |   1 -
 .../cache/transactions/IgniteTxEntry.java       |  16 +-
 .../continuous/GridContinuousHandler.java       |   6 +
 .../continuous/GridContinuousProcessor.java     |  16 +-
 .../StartRoutineAckDiscoveryMessage.java        |  12 +-
 .../StartRoutineDiscoveryMessage.java           |  18 +-
 ...acheContinuousQueryFailoverAbstractTest.java |  67 +++++-
 18 files changed, 387 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0a2fecb1/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index dc3842b..fc65b55 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -23,6 +23,7 @@ import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.Collection;
 import java.util.LinkedList;
+import java.util.Map;
 import java.util.Queue;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
@@ -129,6 +130,11 @@ class GridEventConsumeHandler implements GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
+    @Override public void updateIdx(Map<Integer, Long> idx) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
     @Override public RegisterStatus register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx)
         throws IgniteCheckedException {
         assert nodeId != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/0a2fecb1/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index bddebba..7711843 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.Collection;
+import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
@@ -102,6 +103,11 @@ public class GridMessageListenHandler implements GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
+    @Override public void updateIdx(Map<Integer, Long> idx) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
     @Override public RegisterStatus register(UUID nodeId, UUID routineId, final GridKernalContext ctx) throws IgniteCheckedException {
         ctx.io().addUserMessageListener(topic, pred);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0a2fecb1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index e842f61..12f9290 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1768,12 +1768,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         CacheObject oldVal;
         CacheObject updated;
 
-        if (!primary) {
-            int z = 0;
-
-            ++z;
-        }
-
         GridCacheVersion enqueueVer = null;
 
         GridCacheVersionConflictContext<?, ?> conflictCtx = null;
@@ -1990,7 +1984,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                                 @Override public void apply(IgniteInternalFuture<Void> voidIgniteInternalFuture) {
                                     try {
                                         cctx.continuousQueries().onEntryUpdated(GridCacheMapEntry.this, key, evtVal0,
-                                                prevVal0, primary0, false, updateIdx00, topVer0);
+                                            prevVal0, primary0, false, updateIdx00, topVer0);
                                     }
                                     catch (IgniteCheckedException e) {
                                         // No-op.
@@ -2412,12 +2406,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 final CacheObject oldVal0 = oldVal;
                 final AffinityTopologyVersion topVer0 = topVer;
                 final long updateIdx00 = updateIdx0;
+                final CacheObject val0 = val;
 
                 contQryNtf = new CI1<IgniteInternalFuture<Void>>() {
                     @Override public void apply(IgniteInternalFuture<Void> voidIgniteInternalFuture) {
                         try {
-                            cctx.continuousQueries().onEntryUpdated(GridCacheMapEntry.this, key, val, oldVal0, primary0,
-                                false, updateIdx00, topVer0);
+                            cctx.continuousQueries().onEntryUpdated(GridCacheMapEntry.this, key, val0, oldVal0,
+                                primary0, false, updateIdx00, topVer0);
                         }
                         catch (IgniteCheckedException e) {
                             // No-op.

http://git-wip-us.apache.org/repos/asf/ignite/blob/0a2fecb1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
index 9e2aca6..397024b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.cache;
 
 import javax.cache.processor.EntryProcessor;
-
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;

http://git-wip-us.apache.org/repos/asf/ignite/blob/0a2fecb1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index a210a29..d30cc88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -964,8 +964,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     /** {@inheritDoc} */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
     @Nullable @Override public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId,
-        GridDhtPartitionMap parts,
-        @Nullable Map<Integer, Long> cntrMap) {
+        GridDhtPartitionMap parts, @Nullable Map<Integer, Long> cntrMap) {
         if (log.isDebugEnabled())
             log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']');
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0a2fecb1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index 18ac921..de6326e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@ -191,6 +191,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
      * @param subjId Subject ID.
      * @param taskNameHash Task name hash.
      * @param updateIdxs Partition update idxs.
+     * @param addDepInfo Deployment info flag.
      */
     public GridDhtTxFinishRequest(
         UUID nearNodeId,
@@ -215,11 +216,12 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
         int txSize,
         @Nullable UUID subjId,
         int taskNameHash,
+        boolean addDepInfo,
         Collection<Long> updateIdxs
     ) {
         this(nearNodeId, futId, miniId, topVer, xidVer, commitVer, threadId, isolation, commit, invalidate, sys, plc,
             sysInvalidate, syncCommit, syncRollback, baseVer, committedVers, rolledbackVers, pendingVers, txSize,
-            subjId, taskNameHash);
+            subjId, taskNameHash, addDepInfo);
 
         if (updateIdxs != null && !updateIdxs.isEmpty()) {
             partUpdateCnt = new GridLongList(updateIdxs.size());

http://git-wip-us.apache.org/repos/asf/ignite/blob/0a2fecb1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index d26ad97..5d64648 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1804,7 +1804,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         dhtFut.listen(new CI1<IgniteInternalFuture<Void>>() {
                             @Override public void apply(IgniteInternalFuture<Void> f) {
                                 if (f.isDone() && f.error() == null)
-                                        updRes.contQryNtfy().apply(f);
+                                    updRes.contQryNtfy().apply(f);
                                 }
                             });
                     }
@@ -2557,7 +2557,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             /*event*/true,
                             /*metrics*/true,
                             /*primary*/false,
-                            /*check version*/op != TRANSFORM || !req.forceTransformBackups(),
+                            /*check version*/!req.forceTransformBackups(),
                             req.topologyVersion(),
                             CU.empty0(),
                             replicate ? DR_BACKUP : DR_NONE,
@@ -2614,7 +2614,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         }
         catch (ClusterTopologyCheckedException ignored) {
             U.warn(log, "Failed to send DHT atomic update response to node because it left grid: " +
-                nodeId);
+                req.nodeId());
         }
         catch (IgniteCheckedException e) {
             U.error(log, "Failed to send DHT atomic update response (did node leave grid?) [nodeId=" + nodeId +

http://git-wip-us.apache.org/repos/asf/ignite/blob/0a2fecb1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 4f2caa1..706655b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -353,7 +353,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
                             /*event*/true,
                             /*metrics*/true,
                             /*primary*/false,
-                            /*check version*/op != TRANSFORM || !req.forceTransformBackups(),
+                            /*check version*/!req.forceTransformBackups(),
                             req.topologyVersion(),
                             CU.empty0(),
                             DR_NONE,

http://git-wip-us.apache.org/repos/asf/ignite/blob/0a2fecb1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java
index 1e9a848..f89c466 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java
@@ -97,7 +97,8 @@ public class CacheContinuousQueryBatchAck extends GridCacheMessage {
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeMap("updateIdxs", updateIdxs, MessageCollectionItemType.INT, MessageCollectionItemType.LONG))
+                if (!writer.writeMap("updateIdxs", updateIdxs, MessageCollectionItemType.INT,
+                    MessageCollectionItemType.LONG))
                     return false;
 
                 writer.incrementState();
@@ -127,7 +128,8 @@ public class CacheContinuousQueryBatchAck extends GridCacheMessage {
                 reader.incrementState();
 
             case 4:
-                updateIdxs = reader.readMap("updateIdxs", MessageCollectionItemType.INT, MessageCollectionItemType.LONG, false);
+                updateIdxs = reader.readMap("updateIdxs", MessageCollectionItemType.INT, MessageCollectionItemType.LONG,
+                    false);
 
                 if (!reader.isLastRead())
                     return false;
@@ -140,6 +142,11 @@ public class CacheContinuousQueryBatchAck extends GridCacheMessage {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean addDeploymentInfo() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public byte directType() {
         return 114;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0a2fecb1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
index 896751e..939f7a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -93,9 +94,11 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
 
     /** */
     @GridToStringInclude
-    @GridDirectTransient
     private AffinityTopologyVersion topVer;
 
+    /** Filtered events. */
+    private GridLongList filteredEvts;
+
     /**
      * Required by {@link Message}.
      */
@@ -179,6 +182,10 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
      */
     void markFiltered() {
         flags |= FILTERED_ENTRY;
+        newVal = null;
+        oldVal = null;
+        key = null;
+        depInfo = null;
     }
 
     /**
@@ -191,11 +198,25 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
     /**
      * @return {@code True} if entry was filtered.
      */
-    boolean filtered() {
+    boolean isFiltered() {
         return (flags & FILTERED_ENTRY) != 0;
     }
 
     /**
+     * @param idxs Filtered indexes.
+     */
+    void filteredEvents(GridLongList idxs) {
+        filteredEvts = idxs;
+    }
+
+    /**
+     * @return previous filtered events.
+     */
+    long[] filteredEvents() {
+        return filteredEvts == null ? null : filteredEvts.array();
+    }
+
+    /**
      * @param cctx Cache context.
      * @throws IgniteCheckedException In case of error.
      */
@@ -217,13 +238,15 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
      * @throws IgniteCheckedException In case of error.
      */
     void unmarshal(GridCacheContext cctx, @Nullable ClassLoader ldr) throws IgniteCheckedException {
-        key.finishUnmarshal(cctx.cacheObjectContext(), ldr);
+        if (!isFiltered()) {
+            key.finishUnmarshal(cctx.cacheObjectContext(), ldr);
 
-        if (newVal != null)
-            newVal.finishUnmarshal(cctx.cacheObjectContext(), ldr);
+            if (newVal != null)
+                newVal.finishUnmarshal(cctx.cacheObjectContext(), ldr);
 
-        if (oldVal != null)
-            oldVal.finishUnmarshal(cctx.cacheObjectContext(), ldr);
+            if (oldVal != null)
+                oldVal.finishUnmarshal(cctx.cacheObjectContext(), ldr);
+        }
     }
 
     /**
@@ -322,6 +345,18 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
 
                 writer.incrementState();
 
+            case 8:
+                if (!writer.writeMessage("filteredEvts", filteredEvts))
+                    return false;
+
+                writer.incrementState();
+
+            case 9:
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
         }
 
         return true;
@@ -403,6 +438,22 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
 
                 reader.incrementState();
 
+            case 8:
+                filteredEvts = reader.readMessage("filteredEvts");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 9:
+                topVer = reader.readMessage("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
         }
 
         return reader.afterMessageRead(CacheContinuousQueryEntry.class);
@@ -410,7 +461,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 8;
+        return 10;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/0a2fecb1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index bd44180..8da7ed2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -53,6 +54,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
@@ -60,6 +62,7 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
 import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
 import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter;
+import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.F;
@@ -130,11 +133,17 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
     private transient ConcurrentMap<Integer, PartitionRecovery> rcvs;
 
     /** */
+    private transient ConcurrentMap<Integer, HoleBuffer> snds = new ConcurrentHashMap<>();
+
+    /** */
     private transient AcknowledgeBuffer ackBuf;
 
     /** */
     private transient int cacheId;
 
+    /** */
+    private Map<Integer, Long> initUpdIdx;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -187,8 +196,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
         this.skipPrimaryCheck = skipPrimaryCheck;
         this.localCache = locCache;
 
-        rcvs = new ConcurrentHashMap<>();
-
         cacheId = CU.cacheId(cacheName);
     }
 
@@ -213,6 +220,11 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
+    @Override public void updateIdx(Map<Integer, Long> idx) {
+        this.initUpdIdx = idx;
+    }
+
+    /** {@inheritDoc} */
     @Override public RegisterStatus register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx)
         throws IgniteCheckedException {
         assert nodeId != null;
@@ -229,6 +241,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
 
         ackBuf = new AcknowledgeBuffer();
 
+        rcvs = new ConcurrentHashMap<>();
+
         final boolean loc = nodeId.equals(ctx.localNodeId());
 
         assert !skipPrimaryCheck || loc;
@@ -253,8 +267,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
                 }
             }
 
-            @Override public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt,
-                boolean primary,
+            @Override public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary,
                 boolean recordIgniteEvt) {
                 if (ignoreExpired && evt.getEventType() == EventType.EXPIRED)
                     return;
@@ -288,7 +301,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
                     if (primary || skipPrimaryCheck) {
                         if (loc) {
                             if (!localCache) {
-                                Collection<CacheContinuousQueryEntry> entries = handleEntry(ctx, entry);
+                                Collection<CacheContinuousQueryEntry> entries = clientHandleEvent(ctx, entry);
 
                                 if (!entries.isEmpty()) {
                                     final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name());
@@ -302,7 +315,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
                                         },
                                         new IgnitePredicate<CacheContinuousQueryEntry>() {
                                             @Override public boolean apply(CacheContinuousQueryEntry entry) {
-                                                return !entry.filtered();
+                                                return !entry.isFiltered();
                                             }
                                         }
                                     );
@@ -314,14 +327,18 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
                                 }
                             }
                             else {
-                                if (!entry.filtered())
+                                if (!entry.isFiltered())
                                     locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt));
                             }
                         }
                         else {
-                            prepareEntry(cctx, nodeId, entry);
+                            if (!entry.isFiltered())
+                                prepareEntry(cctx, nodeId, entry);
 
-                            ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true);
+                            CacheContinuousQueryEntry e = handleEntry(entry);
+
+                            if (e != null)
+                                ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true);
                         }
                     }
                     else {
@@ -388,8 +405,10 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
                 try {
                     GridCacheContext<K, V> cctx = cacheContext(ctx);
 
-                    for (CacheContinuousQueryEntry e : backupQueue)
-                        prepareEntry(cctx, nodeId, e);
+                    for (CacheContinuousQueryEntry e : backupQueue) {
+                        if (!e.isFiltered())
+                            prepareEntry(cctx, nodeId, e);
+                    }
 
                     ctx.continuous().addBackupNotification(nodeId, routineId, backupQueue, topic);
 
@@ -514,7 +533,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
         Collection<CacheContinuousQueryEntry> entries0 = new ArrayList<>();
 
         for (CacheContinuousQueryEntry e : entries)
-            entries0.addAll(handleEntry(ctx, e));
+            entries0.addAll(clientHandleEvent(ctx, e));
 
         Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0,
             new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() {
@@ -524,7 +543,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
             },
             new IgnitePredicate<CacheContinuousQueryEntry>() {
                 @Override public boolean apply(CacheContinuousQueryEntry entry) {
-                    return !entry.filtered();
+                    return !entry.isFiltered();
                 }
             }
         );
@@ -537,7 +556,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
      * @param e entry.
      * @return Entry collection.
      */
-    private Collection<CacheContinuousQueryEntry> handleEntry(GridKernalContext ctx, CacheContinuousQueryEntry e) {
+    private Collection<CacheContinuousQueryEntry> clientHandleEvent(GridKernalContext ctx,
+        CacheContinuousQueryEntry e) {
         assert e != null;
 
         // Initial query entry or evicted entry.
@@ -548,7 +568,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
         PartitionRecovery rec = rcvs.get(e.partition());
 
         if (rec == null) {
-            rec = new PartitionRecovery(ctx.log(getClass()));
+            rec = new PartitionRecovery(ctx.log(getClass()), cacheContext(ctx), initUpdIdx.get(e.partition()));
 
             PartitionRecovery oldRec = rcvs.putIfAbsent(e.partition(), rec);
 
@@ -560,26 +580,65 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
     }
 
     /**
+     * @param e Entry.
+     * @return Entry.
+     */
+    private CacheContinuousQueryEntry handleEntry(CacheContinuousQueryEntry e) {
+        assert e != null;
+        assert snds != null;
+
+        // Initial query entry.
+        // This events should be fired immediately.
+        if (e.updateIndex() == -1)
+            return e;
+
+        HoleBuffer buf = snds.get(e.partition());
+
+        if (buf == null) {
+            buf = new HoleBuffer();
+
+            HoleBuffer oldRec = snds.putIfAbsent(e.partition(), buf);
+
+            if (oldRec != null)
+                buf = oldRec;
+        }
+
+        return buf.handle(e);
+    }
+
+    /**
      *
      */
     private static class PartitionRecovery {
+        /** Event which means hole in sequence. */
+        private static final CacheContinuousQueryEntry HOLE = new CacheContinuousQueryEntry();
+
         /** */
         private IgniteLogger log;
 
         /** */
-        private static final long INIT_VALUE = -100;
+        private GridCacheContext cctx;
+
+        /** */
+        private long lastFiredEvt;
 
         /** */
-        private long lastFiredEvt = INIT_VALUE;
+        private AffinityTopologyVersion curTop;
 
         /** */
-        private final Map<Long, CacheContinuousQueryEntry> pendingEnts = new TreeMap<>();
+        private final Map<Long, CacheContinuousQueryEntry> pendingEvts = new TreeMap<>();
 
         /**
          * @param log Logger.
          */
-        public PartitionRecovery(IgniteLogger log) {
+        public PartitionRecovery(IgniteLogger log, GridCacheContext cctx, Long initIdx) {
             this.log = log;
+            this.cctx = cctx;
+
+            if (initIdx != null) {
+                this.lastFiredEvt = initIdx;
+                this.curTop = cctx.topology().topologyVersion();
+            }
         }
 
         /**
@@ -593,26 +652,55 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
 
             List<CacheContinuousQueryEntry> entries;
 
-            synchronized (pendingEnts) {
+            synchronized (pendingEvts) {
                 // Received first event.
-                if (lastFiredEvt == INIT_VALUE) {
+                if (curTop == null) {
                     lastFiredEvt = entry.updateIndex();
 
+                    curTop = entry.topologyVersion();
+
                     return F.asList(entry);
                 }
 
-                // Handle case when nodes owning partition left from topology.
-                if (entry.updateIndex() == 1 && !entry.isBackup()) {
-                    pendingEnts.clear();
+                if (curTop.compareTo(entry.topologyVersion()) < 0) {
+                    GridCacheAffinityManager aff = cctx.affinity();
 
-                    lastFiredEvt = 1;
+                    if (cctx.affinity().backups(entry.partition(), entry.topologyVersion()).isEmpty() &&
+                        !aff.primary(entry.partition(), curTop).id().equals(aff.primary(entry.partition(),
+                            entry.topologyVersion()).id())) {
+                        entries = new ArrayList<>(pendingEvts.size());
 
-                    return F.asList(entry);
+                        for (CacheContinuousQueryEntry evt : pendingEvts.values()) {
+                            if (evt != HOLE && !evt.isFiltered())
+                                entries.add(evt);
+                        }
+
+                        pendingEvts.clear();
+
+                        curTop = entry.topologyVersion();
+
+                        lastFiredEvt = entry.updateIndex();
+
+                        entries.add(entry);
+
+                        return entries;
+                    }
+
+                    curTop = entry.topologyVersion();
                 }
 
                 // Check duplicate.
-                if (entry.updateIndex() > lastFiredEvt)
-                    pendingEnts.put(entry.updateIndex(), entry);
+                if (entry.updateIndex() > lastFiredEvt) {
+                    pendingEvts.put(entry.updateIndex(), entry);
+
+                    // Put filtered events.
+                    if (entry.filteredEvents() != null) {
+                        for (long idx : entry.filteredEvents()) {
+                            if (idx > lastFiredEvt)
+                                pendingEvts.put(idx, HOLE);
+                        }
+                    }
+                }
                 else {
                     if (log.isDebugEnabled())
                         log.debug("Skip duplicate continuous query message: " + entry);
@@ -620,10 +708,10 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
                     return Collections.emptyList();
                 }
 
-                if (pendingEnts.isEmpty())
+                if (pendingEvts.isEmpty())
                     return Collections.emptyList();
 
-                Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = pendingEnts.entrySet().iterator();
+                Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = pendingEvts.entrySet().iterator();
 
                 entries = new ArrayList<>();
 
@@ -634,10 +722,13 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
                     if (e.getKey() == lastFiredEvt + 1) {
                         ++lastFiredEvt;
 
-                        entries.add(e.getValue());
+                        if (e.getValue() != HOLE && !e.getValue().isFiltered())
+                            entries.add(e.getValue());
 
                         iter.remove();
                     }
+                    else
+                        break;
                 }
             }
 
@@ -645,6 +736,73 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
         }
     }
 
+    /**
+     *
+     */
+    private static class HoleBuffer {
+        /** */
+        private final TreeSet<Long> buf = new TreeSet<>();
+
+        /** */
+        private long lastFiredEvt;
+
+        /**
+         * Add continuous entry.
+         *
+         * @param e Cache continuous query entry.
+         * @return Collection entries which will be fired.
+         */
+        public CacheContinuousQueryEntry handle(CacheContinuousQueryEntry e) {
+            assert e != null;
+
+            synchronized (buf) {
+                // Handle filtered events.
+                if (e.isFiltered()) {
+                    if (lastFiredEvt > e.updateIndex() || e.updateIndex() == 1)
+                        return e;
+
+                    buf.add(e.updateIndex());
+
+                    return null;
+                }
+                else {
+                    if (lastFiredEvt < e.updateIndex())
+                        lastFiredEvt = e.updateIndex();
+
+                    // Doesn't have filtered and delayed events.
+                    if (buf.isEmpty() || buf.first() > e.updateIndex())
+                        return e;
+                    else {
+                        GridLongList filteredEvts = new GridLongList(buf.size());
+                        int size = 0;
+
+                        Iterator<Long> iter = buf.iterator();
+
+                        while (iter.hasNext()) {
+                            long idx = iter.next();
+
+                            if (idx < e.updateIndex()) {
+                                filteredEvts.add(idx);
+
+                                iter.remove();
+
+                                ++size;
+                            }
+                            else
+                                break;
+                        }
+
+                        filteredEvts.truncate(size, true);
+
+                        e.filteredEvents(filteredEvts);
+
+                        return e;
+                    }
+                }
+            }
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException {
         assert ctx != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/0a2fecb1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 14fe195..bdd009a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -256,7 +256,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         throws IgniteCheckedException {
         assert e != null;
         assert key != null;
-        assert Thread.holdsLock(e) : e;
 
         if (e.isInternal())
             return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/0a2fecb1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index f5cf501..7d47b3b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -182,6 +182,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
     private byte flags;
 
     /** Partition update index. */
+    @GridDirectTransient
     private long partIdx;
 
     /** */
@@ -953,11 +954,6 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
 
                 writer.incrementState();
 
-            case 12:
-                if (!writer.writeLong("partIdx", partIdx))
-                    return false;
-
-                writer.incrementState();
         }
 
         return true;
@@ -1067,14 +1063,6 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
 
                 reader.incrementState();
 
-            case 12:
-                partIdx = reader.readLong("partIdx");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
         }
 
         return reader.afterMessageRead(IgniteTxEntry.class);
@@ -1087,7 +1075,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 13;
+        return 12;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/0a2fecb1/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
index 40fb12a..648ed7b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.continuous;
 
 import java.io.Externalizable;
 import java.util.Collection;
+import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridKernalContext;
@@ -145,4 +146,9 @@ public interface GridContinuousHandler extends Externalizable, Cloneable {
      * @return Cache name if this is a continuous query handler.
      */
     public String cacheName();
+
+    /**
+     * @param idx Init state for partition indexies.
+     */
+    public void updateIdx(Map<Integer, Long> idx);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/0a2fecb1/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 3ed186e..c63a82f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -205,8 +205,14 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                     StartFuture fut = startFuts.remove(msg.routineId());
 
                     if (fut != null) {
-                        if (msg.errs().isEmpty())
+                        if (msg.errs().isEmpty()) {
+                            LocalRoutineInfo routine = locInfos.get(msg.routineId());
+
+                            if (routine != null)
+                                routine.handler().updateIdx(msg.updateIdxs());
+
                             fut.onRemoteRegistered();
+                        }
                         else {
                             IgniteCheckedException firstEx = F.first(msg.errs().values());
 
@@ -685,7 +691,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
      */
     public void addNotification(UUID nodeId,
         final UUID routineId,
-        Object obj,
+        @Nullable Object obj,
         @Nullable Object orderedTopic,
         boolean sync,
         boolean msg)
@@ -856,6 +862,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             }
         }
 
+        if (ctx.cache() != null && ctx.cache().internalCache(hnd.cacheName()) != null) {
+            Map<Integer, Long> idx = ctx.cache().internalCache(hnd.cacheName()).context().topology().updateCounters();
+
+            req.addUpdateIdxs(idx);
+        }
+
         if (err != null)
             req.addError(ctx.localNodeId(), err);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0a2fecb1/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
index bd4aae3..0b5cfaf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
@@ -35,14 +35,19 @@ public class StartRoutineAckDiscoveryMessage extends AbstractContinuousMessage {
     /** */
     private final Map<UUID, IgniteCheckedException> errs;
 
+    /** */
+    private final Map<Integer, Long> updateIdxs;
+
     /**
      * @param routineId Routine id.
      * @param errs Errs.
      */
-    public StartRoutineAckDiscoveryMessage(UUID routineId, Map<UUID, IgniteCheckedException> errs) {
+    public StartRoutineAckDiscoveryMessage(UUID routineId, Map<UUID, IgniteCheckedException> errs,
+        Map<Integer, Long> idx) {
         super(routineId);
 
         this.errs = new HashMap<>(errs);
+        this.updateIdxs = idx;
     }
 
     /** {@inheritDoc} */
@@ -50,6 +55,11 @@ public class StartRoutineAckDiscoveryMessage extends AbstractContinuousMessage {
         return null;
     }
 
+    /** {@inheritDoc} */
+    public Map<Integer, Long> updateIdxs() {
+        return updateIdxs;
+    }
+
     /**
      * @return Errs.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/0a2fecb1/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
index 892adac..cfacde4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
@@ -37,6 +37,9 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
     /** */
     private final Map<UUID, IgniteCheckedException> errs = new HashMap<>();
 
+    /** */
+    private final Map<Integer, Long> updateIdxes = new HashMap<>();
+
     /**
      * @param routineId Routine id.
      * @param startReqData Start request data.
@@ -63,6 +66,19 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
     }
 
     /**
+     * @param idx Update indexes.
+     */
+    public void addUpdateIdxs(Map<Integer, Long> idx) {
+        for (Map.Entry<Integer, Long> e : idx.entrySet()) {
+            Long cntr0 = updateIdxes.get(e.getKey());
+            Long cntr1 = e.getValue();
+
+            if (cntr0 == null || cntr1 > cntr0)
+                updateIdxes.put(e.getKey(), cntr1);
+        }
+    }
+
+    /**
      * @return Errs.
      */
     public Map<UUID, IgniteCheckedException> errs() {
@@ -76,7 +92,7 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
 
     /** {@inheritDoc} */
     @Override public DiscoveryCustomMessage ackMessage() {
-        return new StartRoutineAckDiscoveryMessage(routineId, errs);
+        return new StartRoutineAckDiscoveryMessage(routineId, errs, updateIdxes);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/0a2fecb1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
index 90e21ad..0a95036 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
@@ -27,7 +27,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -86,10 +90,9 @@ import org.apache.ignite.transactions.Transaction;
 
 import static java.util.concurrent.TimeUnit.MINUTES;
 import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
 
 /**
  *
@@ -172,7 +175,45 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
      * @return Write order mode for atomic cache.
      */
     protected CacheAtomicWriteOrderMode writeOrderMode() {
-        return CLOCK;
+        return PRIMARY;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testFirstFilteredEvent() throws Exception {
+        this.backups = 2;
+
+        final int SRV_NODES = 4;
+
+        startGridsMultiThreaded(SRV_NODES);
+
+        client = true;
+
+        Ignite qryClient = startGrid(SRV_NODES);
+
+        client = false;
+
+        IgniteCache<Object, Object> qryClnCache = qryClient.cache(null);
+
+        final CacheEventListener3 lsnr = new CacheEventListener3();
+
+        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+        qry.setLocalListener(lsnr);
+
+        qry.setRemoteFilter(new CacheEventFilter());
+
+        try (QueryCursor<?> cur = qryClnCache.query(qry)) {
+            List<Integer> keys = testKeys(grid(0).cache(null), 1);
+
+            for (Integer key : keys)
+                qryClnCache.put(key, -1);
+
+            qryClnCache.put(keys.get(0), 100);
+        }
+
+        assertEquals(lsnr.evts.size(), 1);
     }
 
     /**
@@ -1222,7 +1263,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
                     startGrid(idx);
 
-                    Thread.sleep(3000);
+                    Thread.sleep(200);
 
                     log.info("Stop node: " + idx);
 
@@ -1435,7 +1476,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
                     startGrid(idx);
 
-                    Thread.sleep(3000);
+                    Thread.sleep(200);
 
                     log.info("Stop node: " + idx);
 
@@ -1591,7 +1632,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
      * @throws Exception If failed.
      */
     public void testFailoverStartStopBackup() throws Exception {
-        failoverStartStopFilter(atomicityMode() == CacheAtomicityMode.ATOMIC ? 1 : 2);
+        failoverStartStopFilter(2);
     }
 
     /**
@@ -1698,7 +1739,9 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
                     stopGrid(idx);
 
-                    Thread.sleep(100);
+                    awaitPartitionMapExchange();
+
+                    Thread.sleep(200);
 
                     log.info("Start node: " + idx);
 
@@ -1706,6 +1749,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
                     CountDownLatch latch = new CountDownLatch(1);
 
+                    awaitPartitionMapExchange();
+
                     assertTrue(checkLatch.compareAndSet(null, latch));
 
                     if (!stop.get()) {
@@ -1728,7 +1773,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
         final List<T3<Object, Object, Object>> expEvtsLsnr = new ArrayList<>();
 
         try {
-            long stopTime = System.currentTimeMillis() + 10_000;
+            long stopTime = System.currentTimeMillis() + 60_000;
 
             // Start new filter each 5 sec.
             long startFilterTime = System.currentTimeMillis() + 5_000;
@@ -1752,7 +1797,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
                     if (dinQry != null) {
                         dinQry.close();
 
-                        log.error("Continuous query listener closed.");
+                        log.info("Continuous query listener closed.");
 
                         checkEvents(expEvtsNewLsnr, dinLsnr, backups == 0);
                     }
@@ -1767,7 +1812,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
                     dinQry = qryClnCache.query(newQry);
 
-                    log.error("Continuous query listener started.");
+                    log.info("Continuous query listener started.");
 
                     startFilterTime = System.currentTimeMillis() + 5_000;
                 }


Mime
View raw message