ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject [14/19] ignite git commit: IGNITE-426 Added cache continuos query probe. Implemented for TX.
Date Fri, 23 Oct 2015 11:52:16 GMT
IGNITE-426 Added cache continuos query probe. Implemented for TX.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/33910f5d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/33910f5d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/33910f5d

Branch: refs/heads/ignite-426-2-reb
Commit: 33910f5d733c456fb6d32a35285b001ffdca40e3
Parents: 9c61114
Author: nikolay_tikhonov <ntikhonov@gridgain.com>
Authored: Wed Oct 21 15:56:36 2015 +0300
Committer: nikolay_tikhonov <ntikhonov@gridgain.com>
Committed: Fri Oct 23 14:50:10 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheEntryEx.java      |   8 +-
 .../processors/cache/GridCacheMapEntry.java     |  39 ++--
 .../cache/GridCacheUpdateTxResult.java          |  30 ++-
 .../GridDistributedTxRemoteAdapter.java         |  20 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |   2 +
 .../distributed/dht/GridDhtTxFinishFuture.java  |  12 +-
 .../distributed/dht/GridDhtTxFinishRequest.java |  89 +++++++-
 .../continuous/CacheContinuousQueryHandler.java |  30 +--
 .../continuous/CacheContinuousQueryManager.java |   3 -
 .../cache/transactions/IgniteTxEntry.java       |  34 ++-
 .../cache/transactions/IgniteTxHandler.java     |   3 +
 .../transactions/IgniteTxLocalAdapter.java      |  18 +-
 .../cache/transactions/IgniteTxRemoteEx.java    |   7 +-
 .../continuous/GridContinuousProcessor.java     |   3 -
 .../processors/cache/GridCacheTestEntryEx.java  |   8 +-
 ...acheContinuousQueryFailoverAbstractTest.java | 209 ++++++++++++++-----
 ...ueryFailoverAtomicPrimaryWriteOrderTest.java |  14 +-
 ...inuousQueryFailoverAtomicReplicatedTest.java |   3 +-
 .../CacheContinuousQueryFailoverAtomicTest.java |  39 ----
 ...CacheContinuousQueryClientReconnectTest.java | 187 +++++++++++++++++
 .../IgniteCacheContinuousQueryClientTest.java   | 157 ++++++++++++--
 ...cheContinuousQueryClientTxReconnectTest.java |  32 +++
 .../IgniteCacheQuerySelfTestSuite.java          |  14 +-
 .../yardstick/cache/CacheEntryEventProbe.java   | 156 ++++++++++++++
 24 files changed, 942 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/33910f5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index a64752b..28c6e86 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -355,7 +355,8 @@ public interface GridCacheEntryEx {
         long drExpireTime,
         @Nullable GridCacheVersion explicitVer,
         @Nullable UUID subjId,
-        String taskName
+        String taskName,
+        @Nullable Long updateIdx
     ) throws IgniteCheckedException, GridCacheEntryRemovedException;
 
     /**
@@ -390,7 +391,8 @@ public interface GridCacheEntryEx {
         GridDrType drType,
         @Nullable GridCacheVersion explicitVer,
         @Nullable UUID subjId,
-        String taskName
+        String taskName,
+        @Nullable Long updatePartIdx
     ) throws IgniteCheckedException, GridCacheEntryRemovedException;
 
     /**
@@ -972,4 +974,4 @@ public interface GridCacheEntryEx {
      * Calls {@link GridDhtLocalPartition#onUnlock()} for this entry's partition.
      */
     public void onUnlock();
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/33910f5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 570172d..c550e7c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1016,7 +1016,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         long drExpireTime,
         @Nullable GridCacheVersion explicitVer,
         @Nullable UUID subjId,
-        String taskName
+        String taskName,
+        @Nullable Long updateIdx
     ) throws IgniteCheckedException, GridCacheEntryRemovedException {
         CacheObject old;
 
@@ -1106,6 +1107,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
             updateIdx0 = nextPartIndex(topVer);
 
+            if (updateIdx != null && updateIdx != 0)
+                updateIdx0 = updateIdx;
+
             update(val, expireTime, ttl, newVer);
 
             drReplicate(drType, val, newVer);
@@ -1131,7 +1135,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     subjId, null, taskName);
             }
 
-            if (!isNear())
+            if (!isNear() &&
+                // Ignore events on backups for one phase commit.
+                !(tx.onePhaseCommit() && updateIdx != null && updateIdx == 0))
                 cctx.continuousQueries().onEntryUpdated(this, key, val, old, tx.local(), false, updateIdx0, topVer);
 
             cctx.dataStructures().onEntryUpdated(key, false);
@@ -1148,7 +1154,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         if (intercept)
             cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, val, val0));
 
-        return valid ? new GridCacheUpdateTxResult(true, retval ? old : null) :
+        return valid ? new GridCacheUpdateTxResult(true, retval ? old : null, updateIdx0) :
             new GridCacheUpdateTxResult(false, null);
     }
 
@@ -1174,7 +1180,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         GridDrType drType,
         @Nullable GridCacheVersion explicitVer,
         @Nullable UUID subjId,
-        String taskName
+        String taskName,
+        @Nullable Long updateIdx
     ) throws IgniteCheckedException, GridCacheEntryRemovedException {
         assert cctx.transactional();
 
@@ -1261,8 +1268,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
             updateIdx0 = nextPartIndex(topVer);
 
-//            if (updateIdx != null)
-//                updateIdx0 = updateIdx;
+            if (updateIdx != null && updateIdx != 0)
+                updateIdx0 = updateIdx;
 
             drReplicate(drType, null, newVer);
 
@@ -1296,7 +1303,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     taskName);
             }
 
-            if (!isNear())
+            if (!isNear() &&
+                // Ignore events on backups for one phase commit.
+                !(tx.onePhaseCommit() && updateIdx != null && updateIdx == 0))
                 cctx.continuousQueries().onEntryUpdated(this, key, null, old, tx.local(), false, updateIdx0, topVer);
 
             cctx.dataStructures().onEntryUpdated(key, true);
@@ -1347,7 +1356,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             else
                 ret = old;
 
-            return new GridCacheUpdateTxResult(true, ret);
+            return new GridCacheUpdateTxResult(true, ret, updateIdx0);
         }
         else
             return new GridCacheUpdateTxResult(false, null);
@@ -1915,7 +1924,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                             null,
                             null,
                             false,
-                            updateIdx0);
+                            updateIdx0 == null ? 0 : updateIdx0);
                     }
                 }
                 else
@@ -1992,7 +2001,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                         null,
                         null,
                         false,
-                        updateIdx0);
+                        -1);
                 }
             }
 
@@ -2040,7 +2049,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                         null,
                         null,
                         false,
-                        updateIdx0);
+                        updateIdx0 == null ? 0 : updateIdx);
                 }
             }
             else
@@ -3124,13 +3133,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 else if (deletedUnlocked())
                     deletedUnlocked(false);
 
-                drReplicate(drType, val, ver);
-
-                long updateIdx = -1;
+                long updateIdx = 0;
 
                 if (!preload)
                     updateIdx = nextPartIndex(topVer);
 
+                drReplicate(drType, val, ver);
+
                 if (!skipQryNtf) {
                     cctx.continuousQueries().onEntryUpdated(this, key, val, null, true, preload, updateIdx, topVer);
 
@@ -4228,4 +4237,4 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             return "IteratorEntry [key=" + key + ']';
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/33910f5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
index ffda7a2..0f63777 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
@@ -32,6 +32,9 @@ public class GridCacheUpdateTxResult {
     @GridToStringInclude
     private final CacheObject oldVal;
 
+    /** Partition idx. */
+    private long partIdx;
+
     /**
      * Constructor.
      *
@@ -44,6 +47,31 @@ public class GridCacheUpdateTxResult {
     }
 
     /**
+     * Constructor.
+     *
+     * @param success Success flag.
+     * @param oldVal Old value (if any),
+     */
+    GridCacheUpdateTxResult(boolean success, @Nullable CacheObject oldVal, long partIdx) {
+        this.success = success;
+        this.oldVal = oldVal;
+        this.partIdx = partIdx;
+    }
+
+    /**
+     * Sets partition idx.
+     *
+     * @param partIdx Partition idx.
+     */
+    public void partIdx(long partIdx) {
+        this.partIdx = partIdx;
+    }
+
+    public long partIdx() {
+        return partIdx;
+    }
+
+    /**
      * @return Success flag.
      */
     public boolean success() {
@@ -61,4 +89,4 @@ public class GridCacheUpdateTxResult {
     @Override public String toString() {
         return S.toString(GridCacheUpdateTxResult.class, this);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/33910f5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index f969737..b24ff10 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -276,6 +276,19 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public void setPartitionUpdateIdx(long[] idxs) {
+        if (writeMap != null && !writeMap.isEmpty() && idxs != null && idxs.length > 0) {
+            int i = 0;
+
+            for (IgniteTxEntry txEntry : writeMap.values()) {
+                txEntry.partIdx(idxs[i]);
+
+                ++i;
+            }
+        }
+    }
+
     /**
      * Adds completed versions to an entry.
      *
@@ -575,13 +588,13 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                             cached.innerRemove(this, eventNodeId(), nodeId, false, false, true, true,
                                                 topVer, null, replicate ? DR_BACKUP : DR_NONE,
                                                 near() ? null : explicitVer, CU.subjectId(this, cctx),
-                                                resolveTaskName());
+                                                resolveTaskName(), txEntry.partIdx());
                                         else {
                                             cached.innerSet(this, eventNodeId(), nodeId, val, false, false,
                                                 txEntry.ttl(), true, true, topVer, null,
                                                 replicate ? DR_BACKUP : DR_NONE, txEntry.conflictExpireTime(),
                                                 near() ? null : explicitVer, CU.subjectId(this, cctx),
-                                                resolveTaskName());
+                                                resolveTaskName(), txEntry.partIdx());
 
                                             // Keep near entry up to date.
                                             if (nearCached != null) {
@@ -599,7 +612,8 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                     else if (op == DELETE) {
                                         cached.innerRemove(this, eventNodeId(), nodeId, false, false, true, true,
                                             topVer, null, replicate ? DR_BACKUP : DR_NONE,
-                                            near() ? null : explicitVer, CU.subjectId(this, cctx), resolveTaskName());
+                                            near() ? null : explicitVer, CU.subjectId(this, cctx), resolveTaskName(),
+                                            txEntry.partIdx());
 
                                         // Keep near entry up to date.
                                         if (nearCached != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/33910f5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 098a60d..4616b17 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -302,6 +302,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             long updateSeq = this.updateSeq.incrementAndGet();
 
+            cntrMap.clear();
+
             // If this is the oldest node.
             if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion())) {
                 if (node2part == null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/33910f5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 79bccc2..9f2b482 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
@@ -375,6 +376,14 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
 
             add(fut); // Append new future.
 
+            Collection<Long> updateIdxs = F.transform(dhtMapping.entries(), new C1<IgniteTxEntry, Long>() {
+                @Override public Long apply(IgniteTxEntry entry) {
+                    assert entry != null;
+
+                    return entry.partIdx();
+                }
+            });
+
             GridDhtTxFinishRequest req = new GridDhtTxFinishRequest(
                 tx.nearNodeId(),
                 futId,
@@ -397,7 +406,8 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
                 tx.pendingVersions(),
                 tx.size(),
                 tx.subjectId(),
-                tx.taskNameHash());
+                tx.taskNameHash(),
+                updateIdxs);
 
             req.writeVersion(tx.writeVersion() != null ? tx.writeVersion() : tx.xidVersion());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/33910f5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index be59a95..805a557 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishRequest;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
@@ -66,6 +67,11 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
     /** Check comitted flag. */
     private boolean checkCommitted;
 
+    /** Partition update counter. */
+    @GridToStringInclude
+    @GridDirectCollection(Long.class)
+    private GridLongList partUpdateCnt;
+
     /** One phase commit write version. */
     private GridCacheVersion writeVer;
 
@@ -160,6 +166,74 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
     }
 
     /**
+     * @param nearNodeId Near node ID.
+     * @param futId Future ID.
+     * @param miniId Mini future ID.
+     * @param topVer Topology version.
+     * @param xidVer Transaction ID.
+     * @param threadId Thread ID.
+     * @param commitVer Commit version.
+     * @param isolation Transaction isolation.
+     * @param commit Commit flag.
+     * @param invalidate Invalidate flag.
+     * @param sys System flag.
+     * @param sysInvalidate System invalidation flag.
+     * @param syncCommit Synchronous commit flag.
+     * @param syncRollback Synchronous rollback flag.
+     * @param baseVer Base version.
+     * @param committedVers Committed versions.
+     * @param rolledbackVers Rolled back versions.
+     * @param pendingVers Pending versions.
+     * @param txSize Expected transaction size.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash.
+     * @param updateIdxs Partition update idxs.
+     */
+    public GridDhtTxFinishRequest(
+        UUID nearNodeId,
+        IgniteUuid futId,
+        IgniteUuid miniId,
+        @NotNull AffinityTopologyVersion topVer,
+        GridCacheVersion xidVer,
+        GridCacheVersion commitVer,
+        long threadId,
+        TransactionIsolation isolation,
+        boolean commit,
+        boolean invalidate,
+        boolean sys,
+        byte plc,
+        boolean sysInvalidate,
+        boolean syncCommit,
+        boolean syncRollback,
+        GridCacheVersion baseVer,
+        Collection<GridCacheVersion> committedVers,
+        Collection<GridCacheVersion> rolledbackVers,
+        Collection<GridCacheVersion> pendingVers,
+        int txSize,
+        @Nullable UUID subjId,
+        int taskNameHash,
+        Collection<Long> updateIdxs
+    ) {
+        this(nearNodeId, futId, miniId, topVer, xidVer, commitVer, threadId, isolation, commit, invalidate, sys, plc,
+            sysInvalidate, syncCommit, syncRollback, baseVer, committedVers, rolledbackVers, pendingVers, txSize,
+            subjId, taskNameHash);
+
+        if (updateIdxs != null && !updateIdxs.isEmpty()) {
+            partUpdateCnt = new GridLongList(updateIdxs.size());
+
+            for (Long idx : updateIdxs)
+                partUpdateCnt.add(idx);
+        }
+    }
+
+    /**
+     * @return Partition update counters.
+     */
+    public GridLongList partUpdateCounters(){
+        return partUpdateCnt;
+    }
+
+    /**
      * @return Mini ID.
      */
     public IgniteUuid miniId() {
@@ -326,6 +400,11 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
 
                 writer.incrementState();
 
+            case 28:
+                if (!writer.writeMessage("partUpdateCnt", partUpdateCnt))
+                    return false;
+
+                writer.incrementState();
         }
 
         return true;
@@ -426,6 +505,14 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
 
                 reader.incrementState();
 
+            case 28:
+                partUpdateCnt = reader.readMessage("partUpdateCnt");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
         }
 
         return reader.afterMessageRead(GridDhtTxFinishRequest.class);
@@ -438,6 +525,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 28;
+        return 29;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/33910f5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 4734998..520dd46 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -316,10 +316,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
                                 }
                             }
                             else {
-                                locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt));
-
-                                if (!skipPrimaryCheck)
-                                    sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx);
+                                if (!entry.filtered())
+                                    locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt));
                             }
                         }
                         else {
@@ -560,13 +558,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
                 rec = oldRec;
         }
 
-        Collection<CacheContinuousQueryEntry> entries = rec.collectEntries(e);
-
-        if (CacheContinuousQueryManager.SUPER_DEBUG)
-            ctx.log(getClass()).error("Fire the following event for partition : " + e.partition() +
-                " Entries: " + Arrays.toString(entries.toArray()));
-
-        return entries;
+        return rec.collectEntries(e);
     }
 
     /**
@@ -608,9 +600,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
             synchronized (pendingEnts) {
                 // Received first event.
                 if (lastFiredEvt == INIT_VALUE) {
-                    if (CacheContinuousQueryManager.SUPER_DEBUG)
-                        log.error("First event. " + entry);
-
                     lastFiredEvt = entry.updateIndex();
 
                     firedEvents.add(new T2<>(lastFiredEvt, entry));
@@ -624,29 +613,18 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
 
                     lastFiredEvt = 1;
 
-                    if (CacheContinuousQueryManager.SUPER_DEBUG)
-                        log.error("Lost partition. Start from 1. Entry: " + entry);
-
                     firedEvents.add(new T2<>(lastFiredEvt, entry));
 
                     return F.asList(entry);
                 }
 
                 // Check duplicate.
-                if (entry.updateIndex() > lastFiredEvt) {
-                    if (CacheContinuousQueryManager.SUPER_DEBUG)
-                        log.error("Put message to pending queue. Counter value: " + lastFiredEvt + " Entry: " + entry);
-
+                if (entry.updateIndex() > lastFiredEvt)
                     pendingEnts.put(entry.updateIndex(), entry);
-                }
                 else {
                     if (log.isDebugEnabled())
                         log.debug("Skip duplicate continuous query message: " + entry);
 
-                    if (CacheContinuousQueryManager.SUPER_DEBUG)
-                        log.error("Received duplicate. Counter value: " + lastFiredEvt + " Entry: " + entry
-                            + ", Proceed message " + Arrays.toString(firedEvents.toArray()));
-
                     return Collections.emptyList();
                 }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/33910f5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 7c04053..680a96c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -65,7 +65,6 @@ import static javax.cache.event.EventType.EXPIRED;
 import static javax.cache.event.EventType.REMOVED;
 import static javax.cache.event.EventType.UPDATED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
-import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
 import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
 
 /**
@@ -87,8 +86,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     /** */
     private static final long BACKUP_ACK_FREQ = 5000;
 
-    public static final boolean SUPER_DEBUG = false;
-
     /** Listeners. */
     private final ConcurrentMap<UUID, CacheContinuousQueryListener> lsnrs = new ConcurrentHashMap8<>();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/33910f5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 3c33d19..541ffae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -175,6 +175,9 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
      */
     private byte flags;
 
+    /** Partition update index. */
+    private long partIdx;
+
     /**
      * Required by {@link Externalizable}
      */
@@ -363,6 +366,22 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
     }
 
     /**
+     * Sets partition index.
+     *
+     * @param partIdx Partition index.
+     */
+    public void partIdx(long partIdx) {
+        this.partIdx = partIdx;
+    }
+
+    /**
+     * @return Partition index.
+     */
+    public long partIdx() {
+        return partIdx;
+    }
+
+    /**
      * @param val Value to set.
      */
     void setAndMarkValid(CacheObject val) {
@@ -901,6 +920,11 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
 
                 writer.incrementState();
 
+            case 11:
+                if (!writer.writeLong("partIdx", partIdx))
+                    return false;
+
+                writer.incrementState();
         }
 
         return true;
@@ -1002,6 +1026,14 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
 
                 reader.incrementState();
 
+            case 11:
+                partIdx = reader.readLong("partIdx");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
         }
 
         return reader.afterMessageRead(IgniteTxEntry.class);
@@ -1014,7 +1046,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 11;
+        return 12;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/33910f5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index c2cc629..59209f1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -979,6 +979,9 @@ public class IgniteTxHandler {
                 // Complete remote candidates.
                 tx.doneRemote(req.baseVersion(), null, null, null);
 
+                tx.setPartitionUpdateIdx(
+                    req.partUpdateCounters() != null ? req.partUpdateCounters().array() : null);
+
                 tx.commit();
             }
             else {

http://git-wip-us.apache.org/repos/asf/ignite/blob/33910f5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index aa0ffe8..2ea1b61 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -930,7 +930,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                             txEntry.conflictExpireTime(),
                                             cached.isNear() ? null : explicitVer,
                                             CU.subjectId(this, cctx),
-                                            resolveTaskName());
+                                            resolveTaskName(),
+                                            null);
+
+                                        if (updRes.success())
+                                            txEntry.partIdx(updRes.partIdx());
 
                                         if (nearCached != null && updRes.success())
                                             nearCached.innerSet(
@@ -949,7 +953,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                                 txEntry.conflictExpireTime(),
                                                 null,
                                                 CU.subjectId(this, cctx),
-                                                resolveTaskName());
+                                                resolveTaskName(),
+                                                null);
                                     }
                                     else if (op == DELETE) {
                                         GridCacheUpdateTxResult updRes = cached.innerRemove(
@@ -965,7 +970,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                             cached.detached()  ? DR_NONE : drType,
                                             cached.isNear() ? null : explicitVer,
                                             CU.subjectId(this, cctx),
-                                            resolveTaskName());
+                                            resolveTaskName(),
+                                            null);
+
+                                        if (updRes.success())
+                                            txEntry.partIdx(updRes.partIdx());
 
                                         if (nearCached != null && updRes.success())
                                             nearCached.innerRemove(
@@ -981,7 +990,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                                 DR_NONE,
                                                 null,
                                                 CU.subjectId(this, cctx),
-                                                resolveTaskName());
+                                                resolveTaskName(),
+                                                null);
                                     }
                                     else if (op == RELOAD) {
                                         cached.innerReload();

http://git-wip-us.apache.org/repos/asf/ignite/blob/33910f5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
index 9660e4e..845f4f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
@@ -43,4 +43,9 @@ public interface IgniteTxRemoteEx extends IgniteInternalTx {
      * @return {@code True} if entry was found.
      */
     public boolean setWriteValue(IgniteTxEntry e);
-}
\ No newline at end of file
+
+    /**
+     * @param idxs Partition update indexes.
+     */
+    public void setPartitionUpdateIdx(long[] idxs);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/33910f5d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index c7676d2..3ed186e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -37,7 +37,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.events.CacheRebalancingEvent;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.GridKernalContext;
@@ -74,9 +73,7 @@ import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
-import org.jsr166.ConcurrentLinkedDeque8;
 
-import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;

http://git-wip-us.apache.org/repos/asf/ignite/blob/33910f5d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index 26d1f91..44984d5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -457,7 +457,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
         boolean metrics,
         AffinityTopologyVersion topVer,
         CacheEntryPredicate[] filter, GridDrType drType,
-        long drExpireTime, @Nullable GridCacheVersion drVer, UUID subjId, String taskName) throws IgniteCheckedException,
+        long drExpireTime, @Nullable GridCacheVersion drVer, UUID subjId, String taskName, @Nullable Long updateIdx)
+        throws IgniteCheckedException,
         GridCacheEntryRemovedException {
         return new GridCacheUpdateTxResult(true, rawPut(val, ttl));
     }
@@ -530,7 +531,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
         GridDrType drType,
         @Nullable GridCacheVersion drVer,
         UUID subjId,
-        String taskName
+        String taskName,
+        Long updatePartIdx
     ) throws IgniteCheckedException, GridCacheEntryRemovedException {
         obsoleteVer = ver;
 
@@ -859,4 +861,4 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
     @Override public void onUnlock() {
         // No-op.
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/33910f5d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
index ca754af..6029761 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.cache.Cache;
+import javax.cache.CacheException;
 import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryListenerException;
 import javax.cache.event.CacheEntryUpdatedListener;
@@ -55,6 +56,7 @@ import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterTopologyException;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -84,6 +86,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
 
 import static java.util.concurrent.TimeUnit.MINUTES;
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -297,7 +300,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
         IgniteCache<Object, Object> srvCache = igniteSrv.cache(null);
 
-        List<Integer> keys = testKeys(srvCache, 1);
+        List<Integer> keys = testKeys(srvCache, 3);
 
         int keyCnt = keys.size();
 
@@ -371,6 +374,9 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
      * @throws Exception If failed.
      */
     public void testLeftPrimaryAndBackupNodes() throws Exception {
+        if (cacheMode() == REPLICATED)
+            return;
+
         this.backups = 1;
 
         final int SRV_NODES = 3;
@@ -485,7 +491,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
                 return qryClient.cluster().nodes().size() == (SRV_NODES + 1 /** client node */)
                     - 1 /** Primary node */ - backups;
             }
-        }, 10000L);
+        }, 5000L);
 
         for (; keyIter < keys.size(); keyIter++) {
             int key = keys.get(keyIter);
@@ -560,7 +566,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
         final List<T3<Object, Object, Object>> expEvts = new ArrayList<>();
 
-        for (int i = 0; i < SRV_NODES - 1; i++) {
+        for (int i = 0; i < (atomicityMode() == CacheAtomicityMode.ATOMIC ? SRV_NODES - 1 : SRV_NODES - 2); i++) {
             log.info("Stop iteration: " + i);
 
             TestCommunicationSpi spi = (TestCommunicationSpi)ignite(i).configuration().getCommunicationSpi();
@@ -654,7 +660,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
      * @throws Exception If failed.
      */
     private void checkBackupQueue(int backups, boolean updateFromClient) throws Exception {
-        this.backups = backups;
+        this.backups = atomicityMode() == CacheAtomicityMode.ATOMIC ? backups :
+            backups < 2 ? 2 : backups;
 
         final int SRV_NODES = 4;
 
@@ -668,9 +675,6 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
         IgniteCache<Object, Object> qryClientCache = qryClient.cache(null);
 
-        if (cacheMode() != REPLICATED)
-            assertEquals(backups, qryClientCache.getConfiguration(CacheConfiguration.class).getBackups());
-
         Affinity<Object> aff = qryClient.affinity(null);
 
         CacheEventListener1 lsnr = new CacheEventListener1(false);
@@ -687,7 +691,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
         List<T3<Object, Object, Object>> expEvts = new ArrayList<>();
 
-        for (int i = 0; i < SRV_NODES - 1; i++) {
+        for (int i = 0; i < (atomicityMode() == CacheAtomicityMode.ATOMIC ? SRV_NODES - 1 : SRV_NODES - 2); i++) {
             log.info("Stop iteration: " + i);
 
             TestCommunicationSpi spi = (TestCommunicationSpi)ignite(i).configuration().getCommunicationSpi();
@@ -709,6 +713,39 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
                 T2<Object, Object> t = updates.get(key);
 
+                if (updateFromClient) {
+                    if (atomicityMode() == CacheAtomicityMode.TRANSACTIONAL) {
+                        try (Transaction tx = qryClient.transactions().txStart()) {
+                            qryClientCache.put(key, key);
+
+                            tx.commit();
+                        }
+                        catch (CacheException | ClusterTopologyException e) {
+                            log.warning("Failed put. [Key=" + key + ", val=" + key + "]");
+
+                            continue;
+                        }
+                    }
+                    else
+                        qryClientCache.put(key, key);
+                }
+                else {
+                    if (atomicityMode() == CacheAtomicityMode.TRANSACTIONAL) {
+                        try (Transaction tx = ignite.transactions().txStart()) {
+                            cache.put(key, key);
+
+                            tx.commit();
+                        }
+                        catch (CacheException | ClusterTopologyException e) {
+                            log.warning("Failed put. [Key=" + key + ", val=" + key + "]");
+
+                            continue;
+                        }
+                    }
+                    else
+                        cache.put(key, key);
+                }
+
                 if (t == null) {
                     updates.put(key, new T2<>((Object)key, null));
 
@@ -720,11 +757,6 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
                     expEvts.add(new T3<>((Object)key, (Object)key, (Object)key));
                 }
 
-                if (updateFromClient)
-                    qryClientCache.put(key, key);
-                else
-                    cache.put(key, key);
-
                 if (first) {
                     spi.skipMsg = true;
 
@@ -747,7 +779,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
             checkEvents(expEvts, lsnr);
         }
 
-        for (int i = 0; i < SRV_NODES - 1; i++) {
+        for (int i = 0; i < (atomicityMode() == CacheAtomicityMode.ATOMIC ? SRV_NODES - 1 : SRV_NODES - 2); i++) {
             log.info("Start iteration: " + i);
 
             Ignite ignite = startGrid(i);
@@ -782,7 +814,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
                     cache.put(key, key);
             }
 
-            if (!latch.await(5, SECONDS)) {
+            if (!latch.await(10, SECONDS)) {
                 Set<Integer> keys0 = new HashSet<>(keys);
 
                 keys0.removeAll(lsnr.keys);
@@ -824,7 +856,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
      */
     private void checkEvents(final List<T3<Object, Object, Object>> expEvts, final CacheEventListener2 lsnr,
         boolean lostAllow) throws Exception {
-        boolean b = GridTestUtils.waitForCondition(new PA() {
+        GridTestUtils.waitForCondition(new PA() {
             @Override public boolean apply() {
                 return expEvts.size() == lsnr.size();
             }
@@ -910,7 +942,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
             for (T3<Object, Object, Object> e : lostEvents)
                 log.error("Lost event: " + e);
 
-            assertTrue("Lose events, see log for details.", false);
+            fail("Lose events, see log for details.");
         }
 
         log.error("Lost event cnt: " + lostEvents.size());
@@ -1155,17 +1187,19 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
      * @throws Exception If failed.
      */
     public void testFailover() throws Exception {
+        this.backups = 2;
+
         final int SRV_NODES = 4;
 
         startGridsMultiThreaded(SRV_NODES);
 
         client = true;
 
-        Ignite qryClient = startGrid(SRV_NODES);
+        final Ignite qryCln = startGrid(SRV_NODES);
 
         client = false;
 
-        IgniteCache<Object, Object> qryClientCache = qryClient.cache(null);
+        final IgniteCache<Object, Object> qryClnCache = qryCln.cache(null);
 
         final CacheEventListener2 lsnr = new CacheEventListener2();
 
@@ -1173,7 +1207,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
         qry.setLocalListener(lsnr);
 
-        QueryCursor<?> cur = qryClientCache.query(qry);
+        QueryCursor<?> cur = qryClnCache.query(qry);
 
         final AtomicBoolean stop = new AtomicBoolean();
 
@@ -1194,7 +1228,12 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
                     log.info("Stop node: " + idx);
 
-                    stopGrid(idx);
+                    try {
+                        stopGrid(idx);
+                    }
+                    catch (Exception e) {
+                        log.warning("Failed to stop nodes.", e);
+                    }
 
                     CountDownLatch latch = new CountDownLatch(1);
 
@@ -1216,9 +1255,9 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
         final Map<Integer, List<T2<Integer, Integer>>> expEvts = new HashMap<>();
 
         try {
-            long stopTime = System.currentTimeMillis() + 1 * 60_000;
+            long stopTime = System.currentTimeMillis() + 60_000;
 
-            final int PARTS = qryClient.affinity(null).partitions();
+            final int PARTS = qryCln.affinity(null).partitions();
 
             ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
@@ -1234,17 +1273,51 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
                     val = val + 1;
 
                 if (processorPut && prevVal != null) {
-                    qryClientCache.invoke(key, new CacheEntryProcessor<Object, Object, Void>() {
-                        @Override public Void process(MutableEntry<Object, Object> entry,
-                            Object... arguments) throws EntryProcessorException {
-                            entry.setValue(arguments[0]);
+                    if (atomicityMode() == CacheAtomicityMode.TRANSACTIONAL) {
+                        try (Transaction tx = qryCln.transactions().txStart()) {
+                            qryClnCache.invoke(key, new CacheEntryProcessor<Object, Object, Void>() {
+                                @Override public Void process(MutableEntry<Object, Object> e,
+                                    Object... arg) throws EntryProcessorException {
+                                    e.setValue(arg[0]);
+
+                                    return null;
+                                }
+                            }, val);
+
+                            tx.commit();
+                        }
+                        catch (CacheException | ClusterTopologyException e) {
+                            log.warning("Failed put. [Key=" + key + ", val=" + val + "]");
 
-                            return null;
+                            continue;
                         }
-                    }, val);
+                    }
+                    else
+                        qryClnCache.invoke(key, new CacheEntryProcessor<Object, Object, Void>() {
+                            @Override public Void process(MutableEntry<Object, Object> e,
+                                Object... arg) throws EntryProcessorException {
+                                e.setValue(arg[0]);
+
+                                return null;
+                            }
+                        }, val);
+                }
+                else {
+                    if (atomicityMode() == CacheAtomicityMode.TRANSACTIONAL) {
+                        try (Transaction tx = qryCln.transactions().txStart()) {
+                            qryClnCache.put(key, val);
+
+                            tx.commit();
+                        }
+                        catch (CacheException | ClusterTopologyException e) {
+                            log.warning("Failed put. [Key=" + key + ", val=" + val + "]");
+
+                            continue;
+                        }
+                    }
+                    else
+                        qryClnCache.put(key, val);
                 }
-                else
-                    qryClientCache.put(key, val);
 
                 processorPut = !processorPut;
 
@@ -1306,11 +1379,14 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
         restartFut.get();
 
-        boolean check = GridTestUtils.waitForCondition(new GridAbsPredicate() {
-            @Override public boolean apply() {
-                return checkEvents(false, expEvts, lsnr);
-            }
-        }, 10_000);
+        boolean check = true;
+
+        if (!expEvts.isEmpty())
+            check = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    return checkEvents(false, expEvts, lsnr);
+                }
+            }, 10_000);
 
         if (!check)
             assertTrue(checkEvents(true, expEvts, lsnr));
@@ -1324,6 +1400,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
      * @throws Exception If failed.
      */
     public void testFailoverFilter() throws Exception {
+        this.backups = 2;
+
         final int SRV_NODES = 4;
 
         startGridsMultiThreaded(SRV_NODES);
@@ -1385,7 +1463,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
         final Map<Integer, List<T2<Integer, Integer>>> expEvts = new HashMap<>();
 
         try {
-            long stopTime = System.currentTimeMillis() + 1 * 60_000;
+            long stopTime = System.currentTimeMillis() + 60_000;
 
             final int PARTS = qryClient.affinity(null).partitions();
 
@@ -1510,15 +1588,15 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
     /**
      * @throws Exception If failed.
      */
-    public void testFailoverStartStopOneBackup() throws Exception {
-        failoverStartStopFilter(1);
+    public void testFailoverStartStopBackup() throws Exception {
+        failoverStartStopFilter(atomicityMode() == CacheAtomicityMode.ATOMIC ? 1 : 2);
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void _testStartStop() throws Exception {
-        this.backups = 0;
+    public void testStartStop() throws Exception {
+        this.backups = 2;
 
         final int SRV_NODES = 4;
 
@@ -1532,6 +1610,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
         IgniteCache<Object, Object> qryClnCache = qryClient.cache(null);
 
+        Affinity<Object> aff = qryClient.affinity(null);
+
         final CacheEventListener2 lsnr = new CacheEventListener2();
 
         ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
@@ -1542,18 +1622,18 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
         QueryCursor<?> cur = qryClnCache.query(qry);
 
-        for (int i = 0; i < 100; i++) {
+        for (int i = 0; i < 20; i++) {
             final int idx = i % (SRV_NODES - 1);
 
             log.info("Stop node: " + idx);
 
             stopGrid(idx);
 
-            Thread.sleep(200);
+            awaitPartitionMapExchange();
 
             List<T3<Object, Object, Object>> afterRestEvents = new ArrayList<>();
 
-            for (int j = 0; j < 10; j++) {
+            for (int j = 0; j < aff.partitions(); j++) {
                 Integer oldVal = (Integer)qryClnCache.get(j);
 
                 qryClnCache.put(j, i);
@@ -1646,7 +1726,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
         final List<T3<Object, Object, Object>> expEvtsLsnr = new ArrayList<>();
 
         try {
-            long stopTime = System.currentTimeMillis() + 60_000;
+            long stopTime = System.currentTimeMillis() + 10_000;
 
             // Start new filter each 5 sec.
             long startFilterTime = System.currentTimeMillis() + 5_000;
@@ -1785,13 +1865,11 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
             dinLsnr.evts.clear();
             dinLsnr.vals.clear();
-
-            dinQry.close();
         }
 
         List<T3<Object, Object, Object>> afterRestEvents = new ArrayList<>();
 
-        for (int i = 0; i < 1024; i++) {
+        for (int i = 0; i < qryClient.affinity(null).partitions(); i++) {
             Integer oldVal = (Integer)qryClnCache.get(i);
 
             qryClnCache.put(i, i);
@@ -1801,12 +1879,13 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
         checkEvents(new ArrayList<>(afterRestEvents), lsnr, false);
 
-        //checkEvents(new ArrayList<>(afterRestEvents), dinLsnr, false);
-
         cur.close();
 
-        if (dinQry != null)
+        if (dinQry != null) {
+            checkEvents(new ArrayList<>(afterRestEvents), dinLsnr, false);
+
             dinQry.close();
+        }
 
         assertFalse("Unexpected error during test, see log for details.", err);
     }
@@ -1815,6 +1894,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
      * @throws Exception If failed.
      */
     public void testMultiThreaded() throws Exception {
+        this.backups = 2;
+
         final int SRV_NODES = 3;
 
         startGridsMultiThreaded(SRV_NODES);
@@ -1957,8 +2038,24 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
                         T2<Integer, Integer> expEvt = exp.get(i);
                         CacheEntryEvent<?, ?> rcvdEvt = rcvdEvts.get(i);
 
-                        assertEquals(key, rcvdEvt.getKey());
-                        assertEquals(expEvt.get1(), rcvdEvt.getValue());
+                        if (pass) {
+                            assertEquals(key, rcvdEvt.getKey());
+                            assertEquals(expEvt.get1(), rcvdEvt.getValue());
+                        }
+                        else {
+                            if (!key.equals(rcvdEvt.getKey()) || !expEvt.get1().equals(rcvdEvt.getValue()))
+                                log.warning("Missed events. [key=" + key + ", actKey=" + rcvdEvt.getKey()
+                                    + ", expVal=" + expEvt.get1() + ", actVal=" + rcvdEvt.getValue() + "]");
+                        }
+                    }
+
+                    if (!pass) {
+                        for (int i = cnt; i < exp.size(); i++) {
+                            T2<Integer, Integer> val = exp.get(i);
+
+                            log.warning("Missed events. [key=" + key + ", expVal=" + val.get1()
+                                + ", prevVal=" + val.get2() + "]");
+                        }
                     }
                 }
             }
@@ -2168,7 +2265,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
             if (msg0 instanceof GridContinuousMessage) {
                 if (skipMsg) {
-                    log.info("Skip continuous message: " + msg0);
+                    if (log.isDebugEnabled())
+                        log.debug("Skip continuous message: " + msg0);
 
                     return;
                 }
@@ -2176,7 +2274,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
                     AtomicBoolean sndFirstOnly = this.sndFirstOnly;
 
                     if (sndFirstOnly != null && !sndFirstOnly.compareAndSet(false, true)) {
-                        log.info("Skip continuous message: " + msg0);
+                        if (log.isDebugEnabled())
+                            log.debug("Skip continuous message: " + msg0);
 
                         return;
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/33910f5d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.java
index 4ddcf0d..8bd7ea7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.java
@@ -18,15 +18,27 @@
 package org.apache.ignite.internal.processors.cache.query.continuous;
 
 import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
 
 import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
 
 /**
  *
  */
-public class CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest extends CacheContinuousQueryFailoverAtomicTest {
+public class CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest extends CacheContinuousQueryFailoverAbstractTest {
     /** {@inheritDoc} */
     @Override protected CacheAtomicWriteOrderMode writeOrderMode() {
         return PRIMARY;
     }
+
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.ATOMIC;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/33910f5d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java
index 8fc58d3..db5b8cb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java
@@ -26,7 +26,8 @@ import static org.apache.ignite.cache.CacheMode.REPLICATED;
 /**
  *
  */
-public class CacheContinuousQueryFailoverAtomicReplicatedTest extends CacheContinuousQueryFailoverAtomicTest {
+public class CacheContinuousQueryFailoverAtomicReplicatedTest
+    extends CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest {
     /** {@inheritDoc} */
     @Override protected CacheMode cacheMode() {
         return REPLICATED;

http://git-wip-us.apache.org/repos/asf/ignite/blob/33910f5d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicTest.java
deleted file mode 100644
index fb50387..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicTest.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.query.continuous;
-
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cache.CacheMode;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
-import static org.apache.ignite.cache.CacheMode.PARTITIONED;
-
-/**
- *
- */
-public class CacheContinuousQueryFailoverAtomicTest extends CacheContinuousQueryFailoverAbstractTest {
-    /** {@inheritDoc} */
-    @Override protected CacheMode cacheMode() {
-        return PARTITIONED;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected CacheAtomicityMode atomicityMode() {
-        return ATOMIC;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/33910f5d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientReconnectTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientReconnectTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientReconnectTest.java
new file mode 100644
index 0000000..560f2e0
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientReconnectTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.concurrent.CountDownLatch;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteClientReconnectAbstractTest;
+import org.apache.ignite.resources.LoggerResource;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheContinuousQueryClientReconnectTest extends IgniteClientReconnectAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected int serverCount() {
+        return 4;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int clientCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setCacheMode(PARTITIONED);
+        ccfg.setAtomicityMode(atomicMode());
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        return cfg;
+    }
+
+    /**
+     * @return Atomic mode.
+     */
+    protected CacheAtomicityMode atomicMode() {
+        return ATOMIC;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectClient() throws Exception {
+        Ignite client = grid(serverCount());
+
+        Ignite srv = clientRouter(client);
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        final CacheEventListener lsnr = new CacheEventListener();
+
+        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+        qry.setLocalListener(lsnr);
+
+        IgniteCache<Object, Object> clnCache = client.cache(null);
+
+        QueryCursor<?> cur = clnCache.query(qry);
+
+        int keyCnt = 100;
+
+        for (int i = 0; i < 30; i++) {
+            lsnr.latch = new CountDownLatch(keyCnt);
+
+            for (int key = 0; key < keyCnt; key++)
+                clnCache.put(key, key);
+
+            assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS));
+
+            reconnectClientNode(client, srv, null);
+
+            lsnr.latch = new CountDownLatch(keyCnt);
+
+            for (int key = 0; key < keyCnt; key++)
+                clnCache.put(key, key);
+
+            assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS));
+        }
+
+        cur.close();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectClientAndLeftRouter() throws Exception {
+        Ignite client = grid(serverCount());
+
+        final Ignite srv = clientRouter(client);
+
+        final String clnRouterName = srv.name();
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        final CacheEventListener lsnr = new CacheEventListener();
+
+        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+        qry.setLocalListener(lsnr);
+
+        IgniteCache<Object, Object> clnCache = client.cache(null);
+
+        QueryCursor<?> cur = clnCache.query(qry);
+
+        int keyCnt = 100;
+
+        lsnr.latch = new CountDownLatch(keyCnt);
+
+        for (int key = 0; key < keyCnt; key++)
+            clnCache.put(key, key);
+
+        assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS));
+
+        reconnectClientNode(client, srv, new Runnable() {
+            @Override public void run() {
+                stopGrid(clnRouterName);
+            }
+        });
+
+        assertFalse("Client connected to the same server node.", clnRouterName.equals(clientRouter(client).name()));
+
+        lsnr.latch = new CountDownLatch(keyCnt);
+
+        for (int key = 0; key < keyCnt; key++)
+            clnCache.put(key, key);
+
+        assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS));
+
+        cur.close();
+    }
+
+    /**
+     *
+     */
+    private static class CacheEventListener implements CacheEntryUpdatedListener<Object, Object> {
+        /** */
+        private volatile CountDownLatch latch = new CountDownLatch(1);
+
+        /** */
+        @LoggerResource
+        private IgniteLogger log;
+
+        /** {@inheritDoc} */
+        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+            for (CacheEntryEvent<?, ?> evt : evts) {
+                log.info("Received cache event: " + evt);
+
+                latch.countDown();
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/33910f5d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java
index 1afeb05..534f298 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java
@@ -27,11 +27,13 @@ import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.resources.LoggerResource;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -83,11 +85,13 @@ public class IgniteCacheContinuousQueryClientTest extends GridCommonAbstractTest
 
         client = true;
 
-        Ignite clientNode = startGrid(3);
+        final int CLIENT_ID = 3;
+
+        Ignite clientNode = startGrid(CLIENT_ID);
 
         client = false;
 
-        CacheEventListener lsnr = new CacheEventListener();
+        final CacheEventListener lsnr = new CacheEventListener();
 
         ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
 
@@ -95,27 +99,154 @@ public class IgniteCacheContinuousQueryClientTest extends GridCommonAbstractTest
 
         QueryCursor<?> cur = clientNode.cache(null).query(qry);
 
-        Ignite joined1 = startGrid(4);
+        for (int i = 0; i < 10; i++) {
+            log.info("Start iteration: " + i);
+
+            lsnr.latch = new CountDownLatch(1);
+
+            Ignite joined1 = startGrid(4);
+
+            IgniteCache<Object, Object> joinedCache1 = joined1.cache(null);
+
+            joinedCache1.put(primaryKey(joinedCache1), 1);
+
+            assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS));
+
+            lsnr.latch = new CountDownLatch(1);
+
+            Ignite joined2 = startGrid(5);
+
+            IgniteCache<Object, Object> joinedCache2 = joined2.cache(null);
 
-        IgniteCache<Object, Object> joinedCache1 = joined1.cache(null);
+            joinedCache2.put(primaryKey(joinedCache2), 2);
 
-        joinedCache1.put(primaryKey(joinedCache1), 1);
+            assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS));
 
-        assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS));
+            stopGrid(4);
+
+            stopGrid(5);
+        }
 
         cur.close();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNodeJoinsRestartQuery() throws Exception {
+        startGrids(2);
+
+        client = true;
+
+        final int CLIENT_ID = 3;
+
+        Ignite clientNode = startGrid(CLIENT_ID);
+
+        client = false;
+
+        for (int i = 0; i < 10; i++) {
+            log.info("Start iteration: " + i);
+
+            final CacheEventListener lsnr = new CacheEventListener();
+
+            ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+            qry.setLocalListener(lsnr);
+
+            QueryCursor<?> cur = clientNode.cache(null).query(qry);
+
+            lsnr.latch = new CountDownLatch(1);
+
+            Ignite joined1 = startGrid(4);
+
+            IgniteCache<Object, Object> joinedCache1 = joined1.cache(null);
+
+            joinedCache1.put(primaryKey(joinedCache1), 1);
+
+            assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS));
+
+            cur.close();
 
-        lsnr.latch = new CountDownLatch(1);
+            lsnr.latch = new CountDownLatch(1);
 
-        Ignite joined2 = startGrid(5);
+            Ignite joined2 = startGrid(5);
 
-        IgniteCache<Object, Object> joinedCache2 = joined2.cache(null);
+            IgniteCache<Object, Object> joinedCache2 = joined2.cache(null);
 
-        joinedCache2.put(primaryKey(joinedCache2), 2);
+            joinedCache2.put(primaryKey(joinedCache2), 2);
 
-        U.sleep(1000);
+            assertFalse("Unexpected event received.", GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    return 1 != lsnr.latch.getCount();
+                }
+            }, 1000));
 
-        assertEquals("Unexpected event received.", 1, lsnr.latch.getCount());
+            stopGrid(4);
+
+            stopGrid(5);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testServerNodeLeft() throws Exception {
+        startGrids(3);
+
+        client = true;
+
+        final int CLIENT_ID = 3;
+
+        Ignite clnNode = startGrid(CLIENT_ID);
+
+        client = false;
+
+        IgniteOutClosure<IgniteCache<Integer, Integer>> rndCache =
+            new IgniteOutClosure<IgniteCache<Integer, Integer>>() {
+                int cnt = 0;
+
+                @Override public IgniteCache<Integer, Integer> apply() {
+                    ++cnt;
+
+                    return grid(CLIENT_ID).cache(null);
+                }
+            };
+
+        final CacheEventListener lsnr = new CacheEventListener();
+
+        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+        qry.setLocalListener(lsnr);
+
+        QueryCursor<?> cur = clnNode.cache(null).query(qry);
+
+        boolean first = true;
+
+        int keyCnt = 1;
+
+        for (int i = 0; i < 10; i++) {
+            log.info("Start iteration: " + i);
+
+            if (first)
+                first = false;
+            else {
+                for (int srv = 0; srv < CLIENT_ID - 1; srv++)
+                    startGrid(srv);
+            }
+
+            lsnr.latch = new CountDownLatch(keyCnt);
+
+            for (int key = 0; key < keyCnt; key++)
+                rndCache.apply().put(key, key);
+
+            assertTrue("Failed to wait for event. Left events: " + lsnr.latch.getCount(),
+                lsnr.latch.await(10, SECONDS));
+
+            for (int srv = 0; srv < CLIENT_ID - 1; srv++)
+                stopGrid(srv);
+        }
+
+        cur.close();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/33910f5d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTxReconnectTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTxReconnectTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTxReconnectTest.java
new file mode 100644
index 0000000..a10ebc9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTxReconnectTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+
+/**
+ *
+ */
+public class IgniteCacheContinuousQueryClientTxReconnectTest extends IgniteCacheContinuousQueryClientReconnectTest {
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicMode() {
+        return TRANSACTIONAL;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/33910f5d/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 6cb1a52..91dc388 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -65,6 +65,10 @@ import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalAtomicQ
 import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalFieldsQuerySelfTest;
 import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalQuerySelfTest;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSwapScanQuerySelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicReplicatedTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxReplicatedTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicNearEnabledSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicP2PDisabledSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicSelfTest;
@@ -77,7 +81,9 @@ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheCon
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedOneNodeSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedP2PDisabledSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientReconnectTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTxReconnectTest;
 import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryAtomicSelfTest;
 import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryLocalSelfTest;
 import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryPartitionedSelfTest;
@@ -160,8 +166,14 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheContinuousQueryAtomicSelfTest.class);
         suite.addTestSuite(GridCacheContinuousQueryAtomicNearEnabledSelfTest.class);
         suite.addTestSuite(GridCacheContinuousQueryAtomicP2PDisabledSelfTest.class);
-        suite.addTestSuite(IgniteCacheContinuousQueryClientTest.class);
         suite.addTestSuite(GridCacheContinuousQueryReplicatedOneNodeSelfTest.class);
+        suite.addTestSuite(IgniteCacheContinuousQueryClientTest.class);
+        suite.addTestSuite(IgniteCacheContinuousQueryClientReconnectTest.class);
+        suite.addTestSuite(IgniteCacheContinuousQueryClientTxReconnectTest.class);
+        suite.addTestSuite(CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.class);
+        suite.addTestSuite(CacheContinuousQueryFailoverAtomicReplicatedTest.class);
+        suite.addTestSuite(CacheContinuousQueryFailoverTxTest.class);
+        suite.addTestSuite(CacheContinuousQueryFailoverTxReplicatedTest.class);
 
         // Reduce fields queries.
         suite.addTestSuite(GridCacheReduceFieldsQueryLocalSelfTest.class);


Mime
View raw message