ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vkuliche...@apache.org
Subject [24/27] ignite git commit: Fixed "IGNITE-2515 Make 'updateCntr' available through CacheContinuousQueryEvent public API"
Date Wed, 17 Feb 2016 01:49:51 GMT
Fixed "IGNITE-2515 Make 'updateCntr' available through CacheContinuousQueryEvent public API"


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cb35e1d7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cb35e1d7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cb35e1d7

Branch: refs/heads/ignite-2249
Commit: cb35e1d7eaaea470e0afca99d5de9b0aec3e58ae
Parents: 60b6f09
Author: nikolay_tikhonov <ntikhonov@gridgain.com>
Authored: Tue Feb 16 17:44:38 2016 +0300
Committer: nikolay_tikhonov <ntikhonov@gridgain.com>
Committed: Tue Feb 16 17:44:38 2016 +0300

----------------------------------------------------------------------
 .../cache/query/CacheQueryEntryEvent.java       |  48 +
 .../continuous/CacheContinuousQueryEvent.java   |  17 +-
 .../continuous/CacheContinuousQueryHandler.java |  95 +-
 .../CacheContinuousQueryListener.java           |   3 +-
 .../continuous/CacheContinuousQueryManager.java |  26 +-
 ...CacheContinuousQueryCounterAbstractTest.java | 613 +++++++++++++
 ...inuousQueryCounterPartitionedAtomicTest.java |  41 +
 ...ContinuousQueryCounterPartitionedTxTest.java |  41 +
 ...tinuousQueryCounterReplicatedAtomicTest.java |  41 +
 ...eContinuousQueryCounterReplicatedTxTest.java |  41 +
 ...acheContinuousQueryRandomOperationsTest.java | 896 ++++++++++++++++---
 .../IgniteCacheQuerySelfTestSuite.java          |   8 +
 12 files changed, 1652 insertions(+), 218 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/cb35e1d7/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java b/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java
new file mode 100644
index 0000000..2c1c5e6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.query;
+
+import javax.cache.Cache;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.EventType;
+
+/**
+ * A Cache continuous query entry event.
+ *
+ * @param <K> the type of key
+ * @param <V> the type of value
+ */
+public abstract class CacheQueryEntryEvent<K, V> extends CacheEntryEvent<K, V> {
+    /**
+     * Constructs a cache entry event from a given cache as source.
+     *
+     * @param source the cache that originated the event
+     * @param eventType Event type.
+     */
+    public CacheQueryEntryEvent(Cache source, EventType eventType) {
+        super(source, eventType);
+    }
+
+    /**
+     * Each cache update increases partition counter. The same cache updates have on the same value of counter
+     * on primary and backup nodes. This value can be useful to communicate with external applications.
+     *
+     * @return Value of counter for this event.
+     */
+    public abstract long getPartitionUpdateCounter();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/cb35e1d7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
index d26e666..eab5dbd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.internal.processors.cache.query.continuous;
 
 import javax.cache.Cache;
-import javax.cache.event.CacheEntryEvent;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -26,7 +26,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 /**
  * Continuous query event.
  */
-class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> {
+class CacheContinuousQueryEvent<K, V> extends CacheQueryEntryEvent<K, V> {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -57,8 +57,7 @@ class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override
-    public K getKey() {
+    @Override public K getKey() {
         return (K)cctx.cacheObjectContext().unwrapBinaryIfNeeded(e.key(), e.isKeepBinary(), false);
     }
 
@@ -68,8 +67,7 @@ class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override
-    public V getOldValue() {
+    @Override public V getOldValue() {
         return (V)cctx.cacheObjectContext().unwrapBinaryIfNeeded(e.oldValue(), e.isKeepBinary(), false);
     }
 
@@ -79,8 +77,13 @@ class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> {
     }
 
     /** {@inheritDoc} */
+    @Override public long getPartitionUpdateCounter() {
+        return e.updateCounter();
+    }
+
+    /** {@inheritDoc} */
     @Override public <T> T unwrap(Class<T> cls) {
-        if(cls.isAssignableFrom(getClass()))
+        if (cls.isAssignableFrom(getClass()))
             return cls.cast(this);
 
         throw new IllegalArgumentException("Unwrapping to class is not supported: " + cls);

http://git-wip-us.apache.org/repos/asf/ignite/blob/cb35e1d7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 498f37d..08fe62a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -325,9 +325,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 // skipPrimaryCheck is set only when listen locally for replicated cache events.
                 assert !skipPrimaryCheck || (cctx.isReplicated() && ctx.localNodeId().equals(nodeId));
 
-                boolean notify = true;
+                boolean notify = !evt.entry().isFiltered();
 
-                if (rmtFilter != null) {
+                if (notify && rmtFilter != null) {
                     try {
                         notify = rmtFilter.evaluate(evt);
                     }
@@ -472,38 +472,15 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 sendBackupAcknowledge(ackBuf.acknowledgeOnTimeout(), routineId, ctx);
             }
 
-            @Override public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer) {
-                try {
-                    assert evt != null;
-
-                    CacheContinuousQueryEntry e = evt.entry();
-
-                    EntryBuffer buf = entryBufs.get(e.partition());
+            @Override public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer,
+                boolean primary) {
+                assert evt != null;
 
-                    if (buf == null) {
-                        buf = new EntryBuffer();
-
-                        EntryBuffer oldRec = entryBufs.putIfAbsent(e.partition(), buf);
-
-                        if (oldRec != null)
-                            buf = oldRec;
-                    }
+                CacheContinuousQueryEntry e = evt.entry();
 
-                    e = buf.skipEntry(e);
-
-                    if (e != null && !ctx.localNodeId().equals(nodeId))
-                        ctx.continuous().addNotification(nodeId, routineId, e, topic, sync, true);
-                }
-                catch (ClusterTopologyCheckedException ex) {
-                    IgniteLogger log = ctx.log(getClass());
+                e.markFiltered();
 
-                    if (log.isDebugEnabled())
-                        log.debug("Failed to send event notification to node, node left cluster " +
-                                "[node=" + nodeId + ", err=" + ex + ']');
-                }
-                catch (IgniteCheckedException ex) {
-                    U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex);
-                }
+                onEntryUpdated(evt, primary, false);
             }
 
             @Override public void onPartitionEvicted(int part) {
@@ -618,20 +595,22 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         for (CacheContinuousQueryEntry e : entries)
             entries0.addAll(handleEvent(ctx, e));
 
-        Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0,
-            new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() {
-                @Override public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry e) {
-                    return new CacheContinuousQueryEvent<>(cache, cctx, e);
-                }
-            },
-            new IgnitePredicate<CacheContinuousQueryEntry>() {
-                @Override public boolean apply(CacheContinuousQueryEntry entry) {
-                    return !entry.isFiltered();
+        if (!entries0.isEmpty()) {
+            Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0,
+                new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() {
+                    @Override public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry e) {
+                        return new CacheContinuousQueryEvent<>(cache, cctx, e);
+                    }
+                },
+                new IgnitePredicate<CacheContinuousQueryEntry>() {
+                    @Override public boolean apply(CacheContinuousQueryEntry entry) {
+                        return !entry.isFiltered();
+                    }
                 }
-            }
-        );
+            );
 
-        locLsnr.onUpdated(evts);
+            locLsnr.onUpdated(evts);
+        }
     }
 
     /**
@@ -731,11 +710,11 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
          * @param initCntr Update counters.
          */
         public PartitionRecovery(IgniteLogger log, AffinityTopologyVersion topVer, @Nullable Long initCntr) {
-            assert topVer.topologyVersion() > 0 : topVer;
-
             this.log = log;
 
             if (initCntr != null) {
+                assert topVer.topologyVersion() > 0 : topVer;
+
                 this.lastFiredEvt = initCntr;
 
                 curTop = topVer;
@@ -878,32 +857,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         }
 
         /**
-         * @param e Entry.
-         * @return Continuous query entry.
-         */
-        private CacheContinuousQueryEntry skipEntry(CacheContinuousQueryEntry e) {
-            if (lastFiredCntr.get() > e.updateCounter() || e.updateCounter() == 1L) {
-                e.markFiltered();
-
-                return e;
-            }
-            else {
-                buf.add(e.updateCounter());
-
-                // Double check. If another thread sent a event with counter higher than this event.
-                if (lastFiredCntr.get() > e.updateCounter() && buf.contains(e.updateCounter())) {
-                    buf.remove(e.updateCounter());
-
-                    e.markFiltered();
-
-                    return e;
-                }
-                else
-                    return null;
-            }
-        }
-
-        /**
          * Add continuous entry.
          *
          * @param e Cache continuous query entry.

http://git-wip-us.apache.org/repos/asf/ignite/blob/cb35e1d7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index dce04de..83ff32c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -67,8 +67,9 @@ public interface CacheContinuousQueryListener<K, V> {
     /**
      * @param evt Event
      * @param topVer Topology version.
+     * @param primary Primary
      */
-    public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer);
+    public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer, boolean primary);
 
     /**
      * @param part Partition.

http://git-wip-us.apache.org/repos/asf/ignite/blob/cb35e1d7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 968fc23..840a61b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -166,7 +166,27 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
      * @param topVer Topology version.
      */
     public void skipUpdateEvent(Map<UUID, CacheContinuousQueryListener> lsnrs,
-        KeyCacheObject key, int partId, long updCntr, AffinityTopologyVersion topVer) {
+        KeyCacheObject key,
+        int partId,
+        long updCntr,
+        AffinityTopologyVersion topVer) {
+        skipUpdateEvent(lsnrs, key, partId, updCntr, true, topVer);
+    }
+
+    /**
+     * @param lsnrs Listeners to notify.
+     * @param key Entry key.
+     * @param partId Partition id.
+     * @param updCntr Updated counter.
+     * @param topVer Topology version.
+     * @param primary Primary.
+     */
+    public void skipUpdateEvent(Map<UUID, CacheContinuousQueryListener> lsnrs,
+        KeyCacheObject key,
+        int partId,
+        long updCntr,
+        boolean primary,
+        AffinityTopologyVersion topVer) {
         assert lsnrs != null;
 
         for (CacheContinuousQueryListener lsnr : lsnrs.values()) {
@@ -184,7 +204,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
             CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
                 cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
 
-            lsnr.skipUpdateEvent(evt, topVer);
+            lsnr.skipUpdateEvent(evt, topVer, primary);
         }
     }
 
@@ -281,7 +301,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         boolean hasOldVal = oldVal != null;
 
         if (!hasNewVal && !hasOldVal) {
-            skipUpdateEvent(lsnrCol, key, partId, updateCntr, topVer);
+            skipUpdateEvent(lsnrCol, key, partId, updateCntr, primary, topVer);
 
             return;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/cb35e1d7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterAbstractTest.java
new file mode 100644
index 0000000..d8a5006
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterAbstractTest.java
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.integration.CacheWriterException;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentHashMap8;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.LOCAL;
+import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ * Continuous queries counter tests.
+ */
+public abstract class CacheContinuousQueryCounterAbstractTest extends GridCommonAbstractTest
+    implements Serializable {
+    /** */
+    protected static final String CACHE_NAME = "test_cache";
+
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Latch timeout. */
+    protected static final long LATCH_TIMEOUT = 5000;
+
+    /** */
+    private static final String NO_CACHE_GRID_NAME = "noCacheGrid";
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setPeerClassLoadingEnabled(peerClassLoadingEnabled());
+
+        if (gridName.equals(NO_CACHE_GRID_NAME))
+            cfg.setClientMode(true);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(disco);
+
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
+        return cfg;
+    }
+
+    /**
+     * @return Cache configuration.
+     */
+    @NotNull private CacheConfiguration cacheConfiguration() {
+        CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+        cacheCfg.setName(CACHE_NAME);
+        cacheCfg.setCacheMode(cacheMode());
+        cacheCfg.setAtomicityMode(atomicityMode());
+        cacheCfg.setNearConfiguration(nearConfiguration());
+        cacheCfg.setRebalanceMode(ASYNC);
+        cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
+        cacheCfg.setCacheStoreFactory(new StoreFactory());
+        cacheCfg.setReadThrough(true);
+        cacheCfg.setWriteThrough(true);
+        cacheCfg.setLoadPreviousValue(true);
+
+        return cacheCfg;
+    }
+
+    /**
+     * @return Peer class loading enabled flag.
+     */
+    protected boolean peerClassLoadingEnabled() {
+        return true;
+    }
+
+    /**
+     * @return Distribution.
+     */
+    protected NearCacheConfiguration nearConfiguration() {
+        return new NearCacheConfiguration();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGridsMultiThreaded(gridCount());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        GridTestUtils.waitForCondition(new PA() {
+            @Override public boolean apply() {
+                for (int i = 0; i < gridCount(); i++) {
+                    if (grid(i).cluster().nodes().size() != gridCount())
+                        return false;
+                }
+
+                return true;
+            }
+        }, 3000);
+
+        for (int i = 0; i < gridCount(); i++)
+            grid(i).destroyCache(CACHE_NAME);
+
+        for (int i = 0; i < gridCount(); i++)
+            grid(i).getOrCreateCache(cacheConfiguration());
+    }
+
+    /**
+     * @return Cache mode.
+     */
+    protected abstract CacheMode cacheMode();
+
+    /**
+     * @return Atomicity mode.
+     */
+    protected CacheAtomicityMode atomicityMode() {
+        return TRANSACTIONAL;
+    }
+
+    /**
+     * @return Grids count.
+     */
+    protected abstract int gridCount();
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAllEntries() throws Exception {
+        IgniteCache<Integer, Integer> cache = grid(0).cache(CACHE_NAME);
+
+        ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+        final Map<Integer, List<T2<Integer, Long>>> map = new HashMap<>();
+        final CountDownLatch latch = new CountDownLatch(5);
+
+        qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
+                    synchronized (map) {
+                        List<T2<Integer, Long>> vals = map.get(e.getKey());
+
+                        if (vals == null) {
+                            vals = new ArrayList<>();
+
+                            map.put(e.getKey(), vals);
+                        }
+
+                        vals.add(new T2<>(e.getValue(), e
+                            .unwrap(CacheQueryEntryEvent.class).getPartitionUpdateCounter()));
+                    }
+
+                    latch.countDown();
+                }
+            }
+        });
+
+        try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) {
+            cache.put(1, 1);
+            cache.put(2, 2);
+            cache.put(3, 3);
+
+            cache.remove(2);
+
+            cache.put(1, 10);
+
+            assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
+
+            assertEquals(3, map.size());
+
+            List<T2<Integer, Long>> vals = map.get(1);
+
+            assertNotNull(vals);
+            assertEquals(2, vals.size());
+            assertEquals(1, (int)vals.get(0).get1());
+            assertEquals(1L, (long)vals.get(0).get2());
+            assertEquals(10, (int)vals.get(1).get1());
+            assertEquals(2L, (long)vals.get(1).get2());
+
+            vals = map.get(2);
+
+            assertNotNull(vals);
+            assertEquals(2, vals.size());
+            assertEquals(2, (int)vals.get(0).get1());
+            assertEquals(1L, (long)vals.get(0).get2());
+            assertNull(vals.get(1).get1());
+
+            vals = map.get(3);
+
+            assertNotNull(vals);
+            assertEquals(1, vals.size());
+            assertEquals(3, (int)vals.get(0).get1());
+            assertEquals(1L, (long)vals.get(0).get2());
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTwoQueryListener() throws Exception {
+        if (cacheMode() == LOCAL)
+            return;
+
+        final IgniteCache<Integer, Integer> cache = grid(0).cache(CACHE_NAME);
+        final IgniteCache<Integer, Integer> cache1 = grid(1).cache(CACHE_NAME);
+
+        final AtomicInteger cntr = new AtomicInteger(0);
+        final AtomicInteger cntr1 = new AtomicInteger(0);
+
+        final ContinuousQuery<Integer, Integer> qry1 = new ContinuousQuery<>();
+        final ContinuousQuery<Integer, Integer> qry2 = new ContinuousQuery<>();
+
+        final Map<Integer, List<T2<Integer, Long>>> map1 = new HashMap<>();
+        final Map<Integer, List<T2<Integer, Long>>> map2 = new HashMap<>();
+
+        qry1.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
+                    cntr.incrementAndGet();
+
+                    synchronized (map1) {
+                        List<T2<Integer, Long>> vals = map1.get(e.getKey());
+
+                        if (vals == null) {
+                            vals = new ArrayList<>();
+
+                            map1.put(e.getKey(), vals);
+                        }
+
+                        vals.add(new T2<>(e.getValue(),
+                            e.unwrap(CacheQueryEntryEvent.class).getPartitionUpdateCounter()));
+                    }
+                }
+            }
+        });
+
+        qry2.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
+                    cntr1.incrementAndGet();
+
+                    synchronized (map2) {
+                        List<T2<Integer, Long>> vals = map2.get(e.getKey());
+
+                        if (vals == null) {
+                            vals = new ArrayList<>();
+
+                            map2.put(e.getKey(), vals);
+                        }
+
+                        vals.add(new T2<>(e.getValue(),
+                            e.unwrap(CacheQueryEntryEvent.class).getPartitionUpdateCounter()));
+                    }
+                }
+            }
+        });
+
+        try (QueryCursor<Cache.Entry<Integer, Integer>> query2 = cache1.query(qry2);
+            QueryCursor<Cache.Entry<Integer, Integer>> query1 = cache.query(qry1)) {
+            for (int i = 0; i < gridCount(); i++) {
+                IgniteCache<Object, Object> cache0 = grid(i).cache(CACHE_NAME);
+
+                cache0.put(1, 1);
+                cache0.put(2, 2);
+                cache0.put(3, 3);
+
+                cache0.remove(1);
+                cache0.remove(2);
+                cache0.remove(3);
+
+                final int iter = i + 1;
+
+                assert GridTestUtils.waitForCondition(new PA() {
+                    @Override public boolean apply() {
+                        return iter * 6 /* count operation */ * 2 /* count continues queries*/
+                            == (cntr.get() + cntr1.get());
+                    }
+                }, 5000L);
+
+                checkEvents(map1, i);
+
+                map1.clear();
+
+                checkEvents(map2, i);
+
+                map2.clear();
+            }
+        }
+    }
+
+    /**
+     * @param evnts Events.
+     * @param iter Iteration.
+     */
+    private void checkEvents(Map<Integer, List<T2<Integer, Long>>> evnts, long iter) {
+        List<T2<Integer, Long>> val = evnts.get(1);
+
+        assertEquals(val.size(), 2);
+
+        // Check put 1
+        assertEquals(iter * 2 + 1, (long)val.get(0).get2());
+        assertEquals(1L, (long)val.get(0).get1());
+
+        // Check remove 1
+        assertEquals(iter * 2 + 2, (long)val.get(1).get2());
+        assertNull(val.get(1).get1());
+
+        val = evnts.get(2);
+
+        assertEquals(val.size(), 2);
+
+        // Check put 2
+        assertEquals(iter * 2 + 1, (long)val.get(0).get2());
+        assertEquals(2L, (long)val.get(0).get1());
+
+        // Check remove 2
+        assertEquals(iter * 2 + 2, (long)val.get(1).get2());
+        assertNull(val.get(1).get1());
+
+        val = evnts.get(3);
+
+        assertEquals(val.size(), 2);
+
+        // Check put 3
+        assertEquals(iter * 2 + 1, (long)val.get(0).get2());
+        assertEquals(3L, (long)val.get(0).get1());
+
+        // Check remove 3
+        assertEquals(iter * 2 + 2, (long)val.get(1).get2());
+        assertNull(val.get(1).get1());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRestartQuery() throws Exception {
+        IgniteCache<Integer, Integer> cache = grid(0).cache(CACHE_NAME);
+
+        final int keyCnt = 300;
+
+        final int updateKey = 1;
+
+        for (int i = 0; i < keyCnt; i++)
+            cache.put(updateKey, i);
+
+        for (int i = 0; i < 10; i++) {
+            if (i % 2 == 0) {
+                final AtomicInteger cntr = new AtomicInteger(0);
+
+                ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+                final List<T2<Integer, Long>> vals = new ArrayList<>();
+
+                qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+                    @Override public void onUpdated(
+                        Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+                        for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
+                            cntr.incrementAndGet();
+
+                            synchronized (vals) {
+                                vals.add(new T2<>(e.getValue(),
+                                    e.unwrap(CacheQueryEntryEvent.class).getPartitionUpdateCounter()));
+                            }
+                        }
+                    }
+                });
+
+                try (QueryCursor<Cache.Entry<Integer, Integer>> ignore = cache.query(qry)) {
+                    for (int key = 0; key < keyCnt; key++)
+                        cache.put(updateKey, cache.get(updateKey) + 1);
+
+                    assert GridTestUtils.waitForCondition(new PA() {
+                        @Override public boolean apply() {
+                            return cntr.get() == keyCnt;
+                        }
+                    }, 2000L);
+
+                    for (T2<Integer, Long> val : vals) {
+                        assertEquals(vals.size(), keyCnt);
+
+                        assertEquals((long)val.get1() + 1, (long)val.get2());
+                    }
+                }
+            }
+            else {
+                for (int key = 0; key < keyCnt; key++)
+                    cache.put(updateKey, cache.get(updateKey) + 1);
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testEntriesByFilter() throws Exception {
+        IgniteCache<Integer, Integer> cache = grid(0).cache(CACHE_NAME);
+
+        ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+        final Map<Integer, List<T2<Integer, Long>>> map = new HashMap<>();
+        final CountDownLatch latch = new CountDownLatch(8);
+
+        qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
+                    synchronized (map) {
+                        List<T2<Integer, Long>> vals = map.get(e.getKey());
+
+                        if (vals == null) {
+                            vals = new ArrayList<>();
+
+                            map.put(e.getKey(), vals);
+                        }
+
+                        vals.add(new T2<>(e.getValue(),
+                            e.unwrap(CacheQueryEntryEvent.class).getPartitionUpdateCounter()));
+                    }
+
+                    latch.countDown();
+                }
+            }
+        });
+
+        qry.setRemoteFilter(new CacheEntryEventSerializableFilter<Integer,Integer>() {
+            @Override public boolean evaluate(CacheEntryEvent<? extends Integer,? extends Integer> evt) {
+                return evt.getValue() % 2 == 0;
+            }
+        });
+
+        try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) {
+            cache.put(1, 1);
+            cache.put(1, 2);
+            cache.put(1, 3);
+            cache.put(1, 4);
+
+            cache.put(2, 1);
+            cache.put(2, 2);
+            cache.put(2, 3);
+            cache.put(2, 4);
+
+            cache.remove(1);
+            cache.remove(2);
+
+            cache.put(1, 10);
+            cache.put(2, 40);
+
+            assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
+
+            assertEquals(2, map.size());
+
+            List<T2<Integer, Long>> vals = map.get(1);
+
+            assertNotNull(vals);
+            assertEquals(4, vals.size());
+
+            assertEquals((int)vals.get(0).get1(), 2);
+            assertEquals((long)vals.get(0).get1(), (long)vals.get(0).get2());
+
+            assertEquals((int)vals.get(1).get1(), 4);
+            assertEquals((long)vals.get(1).get1(), (long)vals.get(1).get2());
+
+            assertNull(vals.get(2).get1());
+            assertEquals(5, (long)vals.get(2).get2());
+
+            assertEquals((int)vals.get(3).get1(), 10);
+            assertEquals(6, (long)vals.get(3).get2());
+
+            vals = map.get(2);
+
+            assertNotNull(vals);
+            assertEquals(4, vals.size());
+
+            assertEquals((int)vals.get(0).get1(), 2);
+            assertEquals((long)vals.get(0).get1(), (long)vals.get(0).get2());
+
+            assertEquals((int)vals.get(1).get1(), 4);
+            assertEquals((long)vals.get(1).get1(), (long)vals.get(1).get2());
+
+            assertNull(vals.get(2).get1());
+            assertEquals(5, (long)vals.get(2).get2());
+
+            assertEquals((int)vals.get(3).get1(), 40);
+            assertEquals(6, (long)vals.get(3).get2());
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLoadCache() throws Exception {
+        IgniteCache<Integer, Integer> cache = grid(0).cache(CACHE_NAME);
+
+        ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+        final Map<Integer, T2<Integer, Long>> map = new ConcurrentHashMap8<>();
+        final CountDownLatch latch = new CountDownLatch(10);
+
+        qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
+                    map.put(e.getKey(), new T2<>(e.getValue(),
+                        e.unwrap(CacheQueryEntryEvent.class).getPartitionUpdateCounter()));
+
+                    latch.countDown();
+                }
+            }
+        });
+
+        try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) {
+            cache.loadCache(null, 0);
+
+            assert latch.await(LATCH_TIMEOUT, MILLISECONDS) : "Count: " + latch.getCount();
+
+            assertEquals(10, map.size());
+
+            for (int i = 0; i < 10; i++) {
+                assertEquals(i, (int)map.get(i).get1());
+                assertEquals((long)1, (long)map.get(i).get2());
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private static class StoreFactory implements Factory<CacheStore> {
+        @Override public CacheStore create() {
+            return new TestStore();
+        }
+    }
+
+    /**
+     * Store.
+     */
+    private static class TestStore extends CacheStoreAdapter<Object, Object> {
+        /** {@inheritDoc} */
+        @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, Object... args) {
+            for (int i = 0; i < 10; i++)
+                clo.apply(i, i);
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Object load(Object key) {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(Cache.Entry<?, ?> entry) throws CacheWriterException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void delete(Object key) throws CacheWriterException {
+            // No-op.
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/cb35e1d7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedAtomicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedAtomicTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedAtomicTest.java
new file mode 100644
index 0000000..7b97928
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedAtomicTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ *
+ */
+public class CacheContinuousQueryCounterPartitionedAtomicTest extends CacheContinuousQueryCounterAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 2;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/cb35e1d7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedTxTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedTxTest.java
new file mode 100644
index 0000000..aa42832
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedTxTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ *
+ */
+public class CacheContinuousQueryCounterPartitionedTxTest extends CacheContinuousQueryCounterAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.TRANSACTIONAL;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 2;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/cb35e1d7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedAtomicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedAtomicTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedAtomicTest.java
new file mode 100644
index 0000000..afa7a22
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedAtomicTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ *
+ */
+public class CacheContinuousQueryCounterReplicatedAtomicTest extends CacheContinuousQueryCounterAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.REPLICATED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 2;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/cb35e1d7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedTxTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedTxTest.java
new file mode 100644
index 0000000..4ee12de
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedTxTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ *
+ */
+public class CacheContinuousQueryCounterReplicatedTxTest extends CacheContinuousQueryCounterAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.TRANSACTIONAL;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 2;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/cb35e1d7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
index d9b2091..62ed66f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
@@ -18,7 +18,14 @@
 package org.apache.ignite.internal.processors.cache.query.continuous;
 
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Random;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -31,10 +38,13 @@ import javax.cache.integration.CacheLoaderException;
 import javax.cache.integration.CacheWriterException;
 import javax.cache.processor.EntryProcessor;
 import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMemoryMode;
 import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
 import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.store.CacheStore;
@@ -46,6 +56,9 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -58,6 +71,12 @@ import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.ALL;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.CLIENT;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.SERVER;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
 
 /**
  *
@@ -70,12 +89,15 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
     private static final int NODES = 5;
 
     /** */
-    private static final int KEYS = 10;
+    private static final int KEYS = 50;
 
     /** */
     private static final int VALS = 10;
 
     /** */
+    public static final int ITERATION_CNT = 100;
+
+    /** */
     private boolean client;
 
     /** {@inheritDoc} */
@@ -110,6 +132,19 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
     /**
      * @throws Exception If failed.
      */
+    public void testAtomicClient() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, CLIENT);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testAtomic() throws Exception {
         CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
             1,
@@ -117,7 +152,20 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg);
+        testContinuousQuery(ccfg, SERVER);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicAllNodes() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, ALL);
     }
 
     /**
@@ -130,7 +178,33 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg);
+        testContinuousQuery(ccfg, SERVER);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicReplicatedAllNodes() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            0,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, ALL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicReplicatedClient() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            0,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, CLIENT);
     }
 
     /**
@@ -143,7 +217,33 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             OFFHEAP_VALUES,
             false);
 
-        testContinuousQuery(ccfg);
+        testContinuousQuery(ccfg, SERVER);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOffheapValuesAllNodes() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            OFFHEAP_VALUES,
+            false);
+
+        testContinuousQuery(ccfg, ALL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOffheapValuesClient() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            OFFHEAP_VALUES,
+            false);
+
+        testContinuousQuery(ccfg, CLIENT);
     }
 
     /**
@@ -156,7 +256,33 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             OFFHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg);
+        testContinuousQuery(ccfg, SERVER);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOffheapTieredAllNodes() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            OFFHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, ALL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOffheapTieredClient() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            OFFHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, CLIENT);
     }
 
     /**
@@ -169,7 +295,33 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg);
+        testContinuousQuery(ccfg, SERVER);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicNoBackupsAllNodes() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            0,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, ALL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicNoBackupsClient() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            0,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, CLIENT);
     }
 
     /**
@@ -182,7 +334,59 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg);
+        testContinuousQuery(ccfg, SERVER);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxAllNodes() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, ALL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxExplicit() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, SERVER);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxClient() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, CLIENT);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxClientExplicit() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, CLIENT);
     }
 
     /**
@@ -195,7 +399,20 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg);
+        testContinuousQuery(ccfg, SERVER);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxReplicatedClient() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            0,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, CLIENT);
     }
 
     /**
@@ -208,7 +425,46 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             OFFHEAP_VALUES,
             false);
 
-        testContinuousQuery(ccfg);
+        testContinuousQuery(ccfg, SERVER);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOffheapValuesAllNodes() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            OFFHEAP_VALUES,
+            false);
+
+        testContinuousQuery(ccfg, ALL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOffheapValuesExplicit() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            OFFHEAP_VALUES,
+            false);
+
+        testContinuousQuery(ccfg, SERVER);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOffheapValuesClient() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            OFFHEAP_VALUES,
+            false);
+
+        testContinuousQuery(ccfg, CLIENT);
     }
 
     /**
@@ -221,7 +477,46 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             OFFHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg);
+        testContinuousQuery(ccfg, SERVER);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOffheapTieredAllNodes() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            OFFHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, ALL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOffheapTieredClient() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            OFFHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, CLIENT);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOffheapTieredClientExplicit() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            OFFHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, CLIENT);
     }
 
     /**
@@ -234,54 +529,141 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg);
+        testContinuousQuery(ccfg, SERVER);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxNoBackupsAllNodes() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            0,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, ALL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxNoBackupsExplicit() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            0,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, SERVER);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxNoBackupsClient() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            0,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, CLIENT);
     }
 
     /**
      * @param ccfg Cache configuration.
+     * @param deploy The place where continuous query will be started.
      * @throws Exception If failed.
      */
-    private void testContinuousQuery(CacheConfiguration<Object, Object> ccfg) throws Exception {
+    private void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, ContinuousDeploy deploy)
+        throws Exception {
         ignite(0).createCache(ccfg);
 
         try {
-            IgniteCache<Object, Object> cache = ignite(NODES - 1).cache(ccfg.getName());
-
             long seed = System.currentTimeMillis();
 
             Random rnd = new Random(seed);
 
             log.info("Random seed: " + seed);
 
-            ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+            List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues = new ArrayList<>();
 
-            final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue =
-                new ArrayBlockingQueue<>(10_000);
+            Collection<QueryCursor<?>> curs = new ArrayList<>();
 
-            qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
-                @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
-                    for (CacheEntryEvent<?, ?> evt : evts) {
-                        // System.out.println("Event: " + evt);
+            if (deploy == CLIENT) {
+                ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
 
-                        evtsQueue.add(evt);
+                final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000);
+
+                qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
+                    @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+                        for (CacheEntryEvent<?, ?> evt : evts)
+                            evtsQueue.add(evt);
                     }
-                }
-            });
+                });
+
+                evtsQueues.add(evtsQueue);
+
+                QueryCursor<?> cur = grid(NODES - 1).cache(ccfg.getName()).query(qry);
+
+                curs.add(cur);
+            }
+            else if (deploy == SERVER) {
+                ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+                final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000);
+
+                qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
+                    @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+                        for (CacheEntryEvent<?, ?> evt : evts)
+                            evtsQueue.add(evt);
+                    }
+                });
+
+                evtsQueues.add(evtsQueue);
+
+                QueryCursor<?> cur = grid(rnd.nextInt(NODES - 1)).cache(ccfg.getName()).query(qry);
+
+                curs.add(cur);
+            }
+            else {
+                for (int i = 0; i < NODES - 1; i++) {
+                    ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+                    final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000);
+
+                    qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
+                        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+                            for (CacheEntryEvent<?, ?> evt : evts)
+                                evtsQueue.add(evt);
+                        }
+                    });
+
+                    evtsQueues.add(evtsQueue);
 
-            QueryCursor<?> cur = cache.query(qry);
+                    QueryCursor<?> cur = ignite(i).cache(ccfg.getName()).query(qry);
+
+                    curs.add(cur);
+                }
+            }
 
             ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>();
 
+            Map<Integer, Long> partCntr = new ConcurrentHashMap<>();
+
             try {
-                for (int i = 0; i < 1000; i++) {
-                    if (i % 100 == 0)
+                for (int i = 0; i < ITERATION_CNT; i++) {
+                    if (i % 20 == 0)
                         log.info("Iteration: " + i);
 
-                    randomUpdate(rnd, evtsQueue, expData, cache);
+                    for (int idx = 0; idx < NODES; idx++)
+                        randomUpdate(rnd, evtsQueues, expData, partCntr, grid(idx).cache(ccfg.getName()));
                 }
             }
             finally {
-                cur.close();
+                for (QueryCursor<?> cur : curs)
+                    cur.close();
             }
         }
         finally {
@@ -291,176 +673,389 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
 
     /**
      * @param rnd Random generator.
-     * @param evtsQueue Events queue.
+     * @param evtsQueues Events queue.
      * @param expData Expected cache data.
+     * @param partCntr Partition counter.
      * @param cache Cache.
      * @throws Exception If failed.
      */
     private void randomUpdate(
         Random rnd,
-        BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue,
+        List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues,
         ConcurrentMap<Object, Object> expData,
+        Map<Integer, Long> partCntr,
         IgniteCache<Object, Object> cache)
         throws Exception {
         Object key = new QueryTestKey(rnd.nextInt(KEYS));
         Object newVal = value(rnd);
         Object oldVal = expData.get(key);
 
-        int op = rnd.nextInt(11);
+        int op = rnd.nextInt(13);
 
-        // log.info("Random operation [key=" + key + ", op=" + op + ']');
+        Ignite ignite = cache.unwrap(Ignite.class);
 
-        switch (op) {
-            case 0: {
-                cache.put(key, newVal);
+        Transaction tx = null;
 
-                waitEvent(evtsQueue, key, newVal, oldVal);
+        if (cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL && rnd.nextBoolean())
+            tx = ignite.transactions().txStart(txRandomConcurrency(rnd), txRandomIsolation(rnd));
 
-                expData.put(key, newVal);
+        try {
+            // log.info("Random operation [key=" + key + ", op=" + op + ']');
 
-                break;
-            }
+            switch (op) {
+                case 0: {
+                    cache.put(key, newVal);
 
-            case 1: {
-                cache.getAndPut(key, newVal);
+                    if (tx != null)
+                        tx.commit();
 
-                waitEvent(evtsQueue, key, newVal, oldVal);
+                    updatePartitionCounter(cache, key, partCntr);
 
-                expData.put(key, newVal);
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
 
-                break;
-            }
+                    expData.put(key, newVal);
 
-            case 2: {
-                cache.remove(key);
+                    break;
+                }
 
-                waitEvent(evtsQueue, key, null, oldVal);
+                case 1: {
+                    cache.getAndPut(key, newVal);
 
-                expData.remove(key);
+                    if (tx != null)
+                        tx.commit();
 
-                break;
-            }
+                    updatePartitionCounter(cache, key, partCntr);
 
-            case 3: {
-                cache.getAndRemove(key);
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
 
-                waitEvent(evtsQueue, key, null, oldVal);
+                    expData.put(key, newVal);
 
-                expData.remove(key);
+                    break;
+                }
 
-                break;
-            }
+                case 2: {
+                    cache.remove(key);
 
-            case 4: {
-                cache.invoke(key, new EntrySetValueProcessor(newVal, rnd.nextBoolean()));
+                    if (tx != null)
+                        tx.commit();
 
-                waitEvent(evtsQueue, key, newVal, oldVal);
+                    updatePartitionCounter(cache, key, partCntr);
 
-                expData.put(key, newVal);
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal);
 
-                break;
-            }
+                    expData.remove(key);
 
-            case 5: {
-                cache.invoke(key, new EntrySetValueProcessor(null, rnd.nextBoolean()));
+                    break;
+                }
 
-                waitEvent(evtsQueue, key, null, oldVal);
+                case 3: {
+                    cache.getAndRemove(key);
 
-                expData.remove(key);
+                    if (tx != null)
+                        tx.commit();
 
-                break;
-            }
+                    updatePartitionCounter(cache, key, partCntr);
 
-            case 6: {
-                cache.putIfAbsent(key, newVal);
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal);
 
-                if (oldVal == null) {
-                    waitEvent(evtsQueue, key, newVal, null);
+                    expData.remove(key);
 
-                    expData.put(key, newVal);
+                    break;
                 }
-                else
-                    checkNoEvent(evtsQueue);
 
-                break;
-            }
+                case 4: {
+                    cache.invoke(key, new EntrySetValueProcessor(newVal, rnd.nextBoolean()));
+
+                    if (tx != null)
+                        tx.commit();
 
-            case 7: {
-                cache.getAndPutIfAbsent(key, newVal);
+                    updatePartitionCounter(cache, key, partCntr);
 
-                if (oldVal == null) {
-                    waitEvent(evtsQueue, key, newVal, null);
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
 
                     expData.put(key, newVal);
+
+                    break;
                 }
-                else
-                    checkNoEvent(evtsQueue);
 
-                break;
-            }
+                case 5: {
+                    cache.invoke(key, new EntrySetValueProcessor(null, rnd.nextBoolean()));
 
-            case 8: {
-                cache.replace(key, newVal);
+                    if (tx != null)
+                        tx.commit();
 
-                if (oldVal != null) {
-                    waitEvent(evtsQueue, key, newVal, oldVal);
+                    updatePartitionCounter(cache, key, partCntr);
 
-                    expData.put(key, newVal);
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal);
+
+                    expData.remove(key);
+
+                    break;
                 }
-                else
-                    checkNoEvent(evtsQueue);
 
-                break;
-            }
+                case 6: {
+                    cache.putIfAbsent(key, newVal);
 
-            case 9: {
-                cache.getAndReplace(key, newVal);
+                    if (tx != null)
+                        tx.commit();
 
-                if (oldVal != null) {
-                    waitEvent(evtsQueue, key, newVal, oldVal);
+                    if (oldVal == null) {
+                        updatePartitionCounter(cache, key, partCntr);
 
-                    expData.put(key, newVal);
+                        waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, null);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(evtsQueues);
+
+                    break;
                 }
-                else
-                    checkNoEvent(evtsQueue);
 
-                break;
-            }
+                case 7: {
+                    cache.getAndPutIfAbsent(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal == null) {
+                        updatePartitionCounter(cache, key, partCntr);
+
+                        waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, null);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(evtsQueues);
+
+                    break;
+                }
 
-            case 10: {
-                if (oldVal != null) {
-                    Object replaceVal = value(rnd);
+                case 8: {
+                    cache.replace(key, newVal);
 
-                    boolean success = replaceVal.equals(oldVal);
+                    if (tx != null)
+                        tx.commit();
 
-                    if (success) {
-                        cache.replace(key, replaceVal, newVal);
+                    if (oldVal != null) {
+                        updatePartitionCounter(cache, key, partCntr);
 
-                        waitEvent(evtsQueue, key, newVal, oldVal);
+                        waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
 
                         expData.put(key, newVal);
                     }
+                    else
+                        checkNoEvent(evtsQueues);
+
+                    break;
+                }
+
+                case 9: {
+                    cache.getAndReplace(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal != null) {
+                        updatePartitionCounter(cache, key, partCntr);
+
+                        waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(evtsQueues);
+
+                    break;
+                }
+
+                case 10: {
+                    if (oldVal != null) {
+                        Object replaceVal = value(rnd);
+
+                        boolean success = replaceVal.equals(oldVal);
+
+                        if (success) {
+                            cache.replace(key, replaceVal, newVal);
+
+                            if (tx != null)
+                                tx.commit();
+
+                            updatePartitionCounter(cache, key, partCntr);
+
+                            waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+                            expData.put(key, newVal);
+                        }
+                        else {
+                            cache.replace(key, replaceVal, newVal);
+
+                            if (tx != null)
+                                tx.commit();
+
+                            checkNoEvent(evtsQueues);
+                        }
+                    }
                     else {
-                        cache.replace(key, replaceVal, newVal);
+                        cache.replace(key, value(rnd), newVal);
+
+                        if (tx != null)
+                            tx.commit();
 
-                        checkNoEvent(evtsQueue);
+                        checkNoEvent(evtsQueues);
                     }
+
+                    break;
                 }
-                else {
-                    cache.replace(key, value(rnd), newVal);
 
-                    checkNoEvent(evtsQueue);
+                case 11: {
+                    SortedMap<Object, Object> vals = new TreeMap<>();
+
+                    while (vals.size() < KEYS / 5)
+                        vals.put(new QueryTestKey(rnd.nextInt(KEYS)), value(rnd));
+
+                    cache.putAll(vals);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    for (Map.Entry<Object, Object> e : vals.entrySet())
+                        updatePartitionCounter(cache, e.getKey(), partCntr);
+
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), vals, expData);
+
+                    expData.putAll(vals);
+
+                    break;
                 }
 
-                break;
+                case 12: {
+                    SortedMap<Object, Object> vals = new TreeMap<>();
+
+                    while (vals.size() < KEYS / 5)
+                        vals.put(new QueryTestKey(rnd.nextInt(KEYS)), newVal);
+
+                    cache.invokeAll(vals.keySet(), new EntrySetValueProcessor(newVal, rnd.nextBoolean()));
+
+                    if (tx != null)
+                        tx.commit();
+
+                    for (Map.Entry<Object, Object> e : vals.entrySet())
+                        updatePartitionCounter(cache, e.getKey(), partCntr);
+
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), vals, expData);
+
+                    for (Object o : vals.keySet())
+                        expData.put(o, newVal);
+
+                    break;
+                }
+
+                default:
+                    fail("Op:" + op);
             }
+        } finally {
+            if (tx != null)
+                tx.close();
+        }
+    }
+
+    /**
+     *  @param evtsQueues Queue.
+     * @param partCntrs Counters.
+     * @param aff Affinity.
+     * @param vals Values.
+     * @param expData Expected data.
+     */
+    private void waitAndCheckEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues,
+        Map<Integer, Long> partCntrs,
+        Affinity<Object> aff,
+        SortedMap<Object, Object> vals,
+        Map<Object, Object> expData)
+        throws Exception {
+        for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) {
+            Map<Object, CacheEntryEvent> rcvEvts = new HashMap<>();
+
+            for (int i = 0; i < vals.size(); i++) {
+                CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS);
+
+                rcvEvts.put(evt.getKey(), evt);
+            }
+
+            assertEquals(vals.size(), rcvEvts.size());
+
+            for (Map.Entry<Object, Object> e : vals.entrySet()) {
+                Object key = e.getKey();
+                Object val = e.getValue();
+                Object oldVal = expData.get(key);
+
+                if (val == null && oldVal == null) {
+                    checkNoEvent(evtsQueues);
 
-            default:
-                fail();
+                    continue;
+                }
+
+                CacheEntryEvent evt = rcvEvts.get(key);
+
+                assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal=" + oldVal + ']',
+                    evt);
+                assertEquals(key, evt.getKey());
+                assertEquals(val, evt.getValue());
+                assertEquals(oldVal, evt.getOldValue());
+
+                long cntr = partCntrs.get(aff.partition(key));
+                CacheQueryEntryEvent qryEntryEvt = (CacheQueryEntryEvent)evt.unwrap(CacheQueryEntryEvent.class);
+
+                assertNotNull(cntr);
+                assertNotNull(qryEntryEvt);
+
+                assertEquals(cntr, qryEntryEvt.getPartitionUpdateCounter());
+            }
         }
     }
 
     /**
+     * @param rnd {@link Random}.
+     * @return {@link TransactionIsolation}.
+     */
+    private TransactionIsolation txRandomIsolation(Random rnd) {
+        int val = rnd.nextInt(3);
+
+        if (val == 0)
+            return READ_COMMITTED;
+        else if (val == 1)
+            return REPEATABLE_READ;
+        else
+            return SERIALIZABLE;
+    }
+
+    /**
+     * @param rnd {@link Random}.
+     * @return {@link TransactionConcurrency}.
+     */
+    private TransactionConcurrency txRandomConcurrency(Random rnd) {
+        return rnd.nextBoolean() ? TransactionConcurrency.OPTIMISTIC : TransactionConcurrency.PESSIMISTIC;
+    }
+
+    /**
+     * @param cache Cache.
+     * @param key Key
+     * @param cntrs Partition counters.
+     */
+    private void updatePartitionCounter(IgniteCache<Object, Object> cache, Object key, Map<Integer, Long> cntrs) {
+        Affinity<Object> aff = cache.unwrap(Ignite.class).affinity(cache.getName());
+
+        int part = aff.partition(key);
+
+        Long partCntr = cntrs.get(part);
+
+        if (partCntr == null)
+            partCntr = 0L;
+
+        cntrs.put(part, ++partCntr);
+    }
+
+    /**
      * @param rnd Random generator.
      * @return Cache value.
      */
@@ -469,38 +1064,55 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
     }
 
     /**
-     * @param evtsQueue Event queue.
+     * @param evtsQueues Event queue.
+     * @param partCntrs Partition counters.
+     * @param aff Affinity function.
      * @param key Key.
      * @param val Value.
      * @param oldVal Old value.
      * @throws Exception If failed.
      */
-    private void waitEvent(BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue,
-        Object key, Object val, Object oldVal) throws Exception {
+    private void waitAndCheckEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues,
+        Map<Integer, Long> partCntrs,
+        Affinity<Object> aff,
+        Object key,
+        Object val,
+        Object oldVal)
+        throws Exception {
         if (val == null && oldVal == null) {
-            checkNoEvent(evtsQueue);
+            checkNoEvent(evtsQueues);
 
             return;
         }
 
-        CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS);
+        for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) {
+            CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS);
+
+            assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal=" + oldVal + ']', evt);
+            assertEquals(key, evt.getKey());
+            assertEquals(val, evt.getValue());
+            assertEquals(oldVal, evt.getOldValue());
 
-        assertNotNull("Failed to wait for event [key=" + key +
-            ", val=" + val +
-            ", oldVal=" + oldVal + ']', evt);
-        assertEquals(key, evt.getKey());
-        assertEquals(val, evt.getValue());
-        assertEquals(oldVal, evt.getOldValue());
+            long cntr = partCntrs.get(aff.partition(key));
+            CacheQueryEntryEvent qryEntryEvt = evt.unwrap(CacheQueryEntryEvent.class);
+
+            assertNotNull(cntr);
+            assertNotNull(qryEntryEvt);
+
+            assertEquals(cntr, qryEntryEvt.getPartitionUpdateCounter());
+        }
     }
 
     /**
-     * @param evtsQueue Event queue.
+     * @param evtsQueues Event queue.
      * @throws Exception If failed.
      */
-    private void checkNoEvent(BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue) throws Exception {
-        CacheEntryEvent<?, ?> evt = evtsQueue.poll(50, MILLISECONDS);
+    private void checkNoEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues) throws Exception {
+        for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) {
+            CacheEntryEvent<?, ?> evt = evtsQueue.poll(50, MILLISECONDS);
 
-        assertNull(evt);
+            assertNull(evt);
+        }
     }
 
     /**
@@ -564,7 +1176,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
     /**
      *
      */
-    static class QueryTestKey implements Serializable {
+    static class QueryTestKey implements Serializable, Comparable {
         /** */
         private final Integer key;
 
@@ -597,6 +1209,11 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
         @Override public String toString() {
             return S.toString(QueryTestKey.class, this);
         }
+
+        /** {@inheritDoc} */
+        @Override public int compareTo(Object o) {
+            return key - ((QueryTestKey)o).key;
+        }
     }
 
     /**
@@ -644,6 +1261,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             return S.toString(QueryTestValue.class, this);
         }
     }
+
     /**
      *
      */
@@ -681,4 +1299,10 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
         }
     }
 
+    /**
+     *
+     */
+    protected enum ContinuousDeploy {
+        CLIENT, SERVER, ALL
+    }
 }


Mime
View raw message