ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject ignite git commit: IGNITE-2004 Fixed review notes.
Date Tue, 05 Apr 2016 18:53:00 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-2004 09b56444d -> 80b134e67


IGNITE-2004 Fixed review notes.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/80b134e6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/80b134e6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/80b134e6

Branch: refs/heads/ignite-2004
Commit: 80b134e67942237400b17e79b27af5adf0c877fd
Parents: 09b5644
Author: Tikhonov Nikolay <tikhonovnicolay@gmail.com>
Authored: Tue Apr 5 21:50:16 2016 +0300
Committer: Tikhonov Nikolay <tikhonovnicolay@gmail.com>
Committed: Tue Apr 5 21:50:16 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     |   9 +-
 .../cache/GridCacheUpdateAtomicResult.java      |   7 -
 .../continuous/CacheContinuousQueryClosure.java |   6 +-
 .../continuous/CacheContinuousQueryEvent.java   |   1 -
 .../continuous/CacheContinuousQueryHandler.java | 110 ++--
 .../CacheContinuousQueryListener.java           |   1 -
 .../thread/IgniteStripedThreadPoolExecutor.java |  11 +-
 ...eContinuousQueryAsyncFilterListenerTest.java | 392 +++++++++++---
 .../CacheContinuousQueryDeadlockTest.java       | 523 +++++++++++++++++++
 ...usQueryFactoryFilterRandomOperationTest.java |   4 +-
 .../CacheContinuousQueryOrderingEventTest.java  | 195 +++++--
 ...ridCacheContinuousQueryAbstractSelfTest.java |   8 -
 .../IgniteCacheQuerySelfTestSuite3.java         |   6 +-
 13 files changed, 1078 insertions(+), 195 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/80b134e6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index c317e56..a0ae1c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -23,7 +23,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.cache.Cache;
 import javax.cache.expiry.ExpiryPolicy;
@@ -36,7 +35,6 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
 import org.apache.ignite.cache.CacheMemoryMode;
 import org.apache.ignite.cache.eviction.EvictableEntry;
-import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -49,7 +47,6 @@ import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntry
 import org.apache.ignite.internal.processors.cache.extras.GridCacheTtlEntryExtras;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryClosure;
-import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
@@ -2512,13 +2509,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
             // Continuous query filter should be perform under lock.
             if (lsnrs != null) {
-                CacheObject evtVal = val;
+                CacheObject evtVal = updated;
                 CacheObject evtOldVal = oldVal;
 
                 if (isOffHeapValuesOnly()) {
-                    evtVal = cctx.toCacheObject(cctx.unwrapTemporary(updated));
+                    evtVal = cctx.toCacheObject(cctx.unwrapTemporary(evtVal));
 
-                    evtOldVal = cctx.toCacheObject(cctx.unwrapTemporary(updated0));
+                    evtOldVal = cctx.toCacheObject(cctx.unwrapTemporary(evtOldVal));
                 }
 
                 clsrs = cctx.continuousQueries().onEntryUpdated(lsnrs, key, evtVal, evtOldVal, internal,

http://git-wip-us.apache.org/repos/asf/ignite/blob/80b134e6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
index cbd9707..a96675b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
@@ -179,13 +179,6 @@ public class GridCacheUpdateAtomicResult {
     }
 
     /**
-     * @param clsrs Closures.
-     */
-    private void continuousQueryClosures(List<CacheContinuousQueryClosure> clsrs) {
-        this.cntQryClsrs = clsrs;
-    }
-
-    /**
      * @return Continuous query closures.
      */
     public List<CacheContinuousQueryClosure> continuousQueryClosures() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/80b134e6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryClosure.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryClosure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryClosure.java
index f000b93..3fd9e57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryClosure.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryClosure.java
@@ -18,16 +18,16 @@
 package org.apache.ignite.internal.processors.cache.query.continuous;
 
 /**
- *
+ * Continuous query closure.
  */
 public interface CacheContinuousQueryClosure extends Runnable {
     /**
-     *
+     * Callback for case when future completed successfully.
      */
     public void onEntryUpdate();
 
     /**
-     *
+     * Callback for case when future completed with error..
      */
     public void skipEvent();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/80b134e6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
index 2bfd53d..7b70290 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.query.continuous;
 
 import javax.cache.Cache;
 import org.apache.ignite.cache.query.CacheQueryEntryEvent;
-import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.S;

http://git-wip-us.apache.org/repos/asf/ignite/blob/80b134e6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index b3d5028..fff8a92 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -42,13 +42,11 @@ import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryEventFilter;
 import javax.cache.event.CacheEntryUpdatedListener;
 import javax.cache.event.EventType;
-import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
-import org.apache.ignite.lang.IgniteAsyncCallback;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.CacheQueryExecutedEvent;
 import org.apache.ignite.events.CacheQueryReadEvent;
@@ -80,6 +78,7 @@ import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteAsyncCallback;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.jetbrains.annotations.NotNull;
@@ -538,7 +537,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Override public void notifyCallback(UUID nodeId, UUID routineId, Collection<?> objs, GridKernalContext ctx) {
+    @Override public void notifyCallback(UUID nodeId, UUID routineId, Collection<?> objs, final GridKernalContext ctx) {
         assert nodeId != null;
         assert routineId != null;
         assert objs != null;
@@ -548,9 +547,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
         final GridCacheContext cctx = cacheContext(ctx);
 
-        Collection<CacheContinuousQueryEntry> entries0 = new ArrayList<>();
+        final Collection<CacheContinuousQueryEntry> entries0 = new ArrayList<>();
 
-        List<PartitionRecovery> recoveries = new ArrayList<>();
+        final List<PartitionRecovery> rcvs = new ArrayList<>();
 
         try {
             for (CacheContinuousQueryEntry e : entries) {
@@ -570,12 +569,14 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 try {
                     e.unmarshal(cctx, ldr);
 
-                    T2<Collection<CacheContinuousQueryEntry>, PartitionRecovery> evts = handleEvent(ctx, e);
+                    if (!asyncCallback) {
+                        T2<Collection<CacheContinuousQueryEntry>, PartitionRecovery> evts = handleEvent(ctx, e, false);
 
-                    if (evts.get2() != null)
-                        recoveries.add(evts.get2());
+                        if (evts.get2() != null)
+                            rcvs.add(evts.get2());
 
-                    entries0.addAll(evts.get1());
+                        entries0.addAll(evts.get1());
+                    }
                 }
                 catch (IgniteCheckedException ex) {
                     if (ignoreClsNotFound)
@@ -587,51 +588,43 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
             final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name());
 
-            if (!entries0.isEmpty()) {
-                if (asyncCallback) {
-                    Iterable<CacheContinuousQueryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0,
-                        new C1<CacheContinuousQueryEntry, CacheContinuousQueryEvent<? extends K, ? extends V>>() {
-                            @Override public CacheContinuousQueryEvent<? extends K, ? extends V> apply(
-                                CacheContinuousQueryEntry e) {
-                                return new CacheContinuousQueryEvent<>(cache, cctx, e);
-                            }
-                        },
-                        new IgnitePredicate<CacheContinuousQueryEntry>() {
-                            @Override public boolean apply(CacheContinuousQueryEntry entry) {
-                                return !entry.isFiltered();
-                            }
-                        }
-                    );
+            if (asyncCallback) {
+                for (final CacheContinuousQueryEntry e : entries) {
+                    ctx.continuousQueryPool().execute(new Runnable() {
+                        @Override public void run() {
+                            T2<Collection<CacheContinuousQueryEntry>, PartitionRecovery> evts =
+                                handleEvent(ctx, e, false);
 
-                    for (final CacheContinuousQueryEvent<? extends K, ? extends V> e : evts) {
-                        ctx.continuousQueryPool().execute(new Runnable() {
-                            @Override public void run() {
-                                locLsnr.onUpdated(Collections.<CacheEntryEvent<? extends K, ? extends V>>singleton(e));
+                            for (CacheContinuousQueryEntry entry : evts.get1()) {
+                                CacheContinuousQueryEvent evt =
+                                    new CacheContinuousQueryEvent<>(cache, cctx, entry);
+
+                                locLsnr.onUpdated(Collections.<CacheEntryEvent<? extends K, ? extends V>>
+                                    singleton(evt));
                             }
-                        }, e.partitionId());
-                    }
+                        }
+                    }, e.partition());
                 }
-                else {
-                    Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0,
-                        new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() {
-                            @Override
-                            public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry e) {
-                                return new CacheContinuousQueryEvent<>(cache, cctx, e);
-                            }
-                        },
-                        new IgnitePredicate<CacheContinuousQueryEntry>() {
-                            @Override public boolean apply(CacheContinuousQueryEntry entry) {
-                                return !entry.isFiltered();
-                            }
+            }
+            else if (!entries0.isEmpty()) {
+                Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0,
+                    new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() {
+                        @Override public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry e) {
+                            return new CacheContinuousQueryEvent<>(cache, cctx, e);
+                        }
+                    },
+                    new IgnitePredicate<CacheContinuousQueryEntry>() {
+                        @Override public boolean apply(CacheContinuousQueryEntry entry) {
+                            return !entry.isFiltered();
                         }
-                    );
+                    }
+                );
 
-                    locLsnr.onUpdated(evts);
-                }
+                locLsnr.onUpdated(evts);
             }
         }
         finally {
-            for (PartitionRecovery rec : recoveries)
+            for (PartitionRecovery rec : rcvs)
                 rec.unlock();
         }
     }
@@ -639,10 +632,11 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     /**
      * @param ctx Context.
      * @param e entry.
+     * @param async Async.
      * @return Entry collection.
      */
     private T2<Collection<CacheContinuousQueryEntry>, PartitionRecovery> handleEvent(GridKernalContext ctx,
-        CacheContinuousQueryEntry e) {
+        CacheContinuousQueryEntry e, boolean async) {
         assert e != null;
 
         if (internal) {
@@ -658,7 +652,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
         PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition());
 
-        return new T2<>(rec.collectEntries(e), rec);
+        return new T2<>(rec.collectEntries(e, async), async ? null : rec);
     }
 
     /**
@@ -788,8 +782,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
          * @param entry Cache continuous query entry.
          * @return Collection entries which will be fired.
          */
-        public Collection<CacheContinuousQueryEntry> collectEntries(CacheContinuousQueryEntry entry) {
-            lock.lock();
+        public Collection<CacheContinuousQueryEntry> collectEntries(CacheContinuousQueryEntry entry, boolean async) {
+            if (!async)
+                lock.lock();
 
             try {
                 assert entry != null;
@@ -893,7 +888,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 return entries;
             }
             catch (Exception e) {
-                lock.unlock();
+                if (!async)
+                    lock.unlock();
 
                 throw new IgniteException("Failed to collect entries.");
             }
@@ -1352,7 +1348,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
             filter();
 
             if (fireEvent || waitIfAsync())
-                onEntryUpdate();
+                onEntryUpdate0();
         }
 
         /**
@@ -1398,12 +1394,20 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 return;
             }
 
+            onEntryUpdate0();
+        }
+
+        /**
+         *
+         */
+        private void onEntryUpdate0() {
             try {
                 final CacheContinuousQueryEntry entry = evt.entry();
 
                 if (loc) {
                     if (!locCache) {
-                        T2<Collection<CacheContinuousQueryEntry>, PartitionRecovery> events = handleEvent(ctx, entry);
+                        T2<Collection<CacheContinuousQueryEntry>, PartitionRecovery> events =
+                            handleEvent(ctx, entry, asyncCallback);
 
                         try {
                             Collection<CacheContinuousQueryEntry> entries = events.get1();

http://git-wip-us.apache.org/repos/asf/ignite/blob/80b134e6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index bf1d4a4..3aefafe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.query.continuous;
 
 import java.util.Map;
 import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 
 /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/80b134e6/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
index 4cc6416..44ea823 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
@@ -69,7 +69,10 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService {
         ThreadFactory factory = new IgniteThreadFactory(gridName, threadNamePrefix);
 
         for (int i = 0; i < concurrentLvl; i++)
-            execs[i] = Executors.newFixedThreadPool(poolSize, factory);
+            if (poolSize == 1)
+                execs[i] = Executors.newSingleThreadExecutor(factory);
+            else
+                execs[i] = Executors.newFixedThreadPool(poolSize, factory);
 
         // Find power-of-two sizes best matching arguments
         int sshift = 0;
@@ -161,8 +164,8 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService {
     public void execute(Runnable task, int idx) {
         if (idx < execs.length)
             execs[idx].execute(task);
-
-        execs[idx % execs.length].execute(task);
+        else
+            execs[idx % execs.length].execute(task);
     }
 
     /** {@inheritDoc} */
@@ -248,7 +251,7 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService {
     private <T> ExecutorService execForTask(T cmd) {
         assert cmd != null;
 
-        return execs[(hash(cmd.hashCode()) >>> segShift) & segMask];
+        return execs[(hash(System.identityHashCode(cmd)) >>> segShift) & segMask];
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/80b134e6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java
index 7958ac3..6780c18 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java
@@ -18,20 +18,28 @@
 package org.apache.ignite.internal.processors.cache.query.continuous;
 
 import java.io.Serializable;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import javax.cache.configuration.FactoryBuilder;
 import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryListenerException;
 import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheEntryProcessor;
 import org.apache.ignite.cache.CacheMemoryMode;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.lang.IgniteAsyncCallback;
 import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cache.query.QueryCursor;
@@ -46,6 +54,7 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
 
@@ -350,20 +359,27 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr
     public void testNonDeadLockInListener(CacheConfiguration ccfg,
         final boolean asyncFilter,
         boolean asyncListener) throws Exception {
-        final IgniteCache cache = ignite(0).createCache(ccfg);
+        ignite(0).createCache(ccfg);
 
-        try {
-            final QueryTestKey key = affinityKey(cache);
-
-            final QueryTestValue val0 = new QueryTestValue(1);
-            final QueryTestValue newVal = new QueryTestValue(2);
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
+        try {
             for (int i = 0; i < ITERATION_CNT; i++) {
                 log.info("Start iteration: " + i);
 
+                int nodeIdx = i % NODES;
+
+                final IgniteCache cache = grid(nodeIdx).cache(ccfg.getName());
+
+                final QueryTestKey key = NODES - 1 != nodeIdx ? affinityKey(cache) : new QueryTestKey(1);
+
+                final QueryTestValue val0 = new QueryTestValue(1);
+                final QueryTestValue newVal = new QueryTestValue(2);
+
                 ContinuousQuery<QueryTestKey, QueryTestValue> conQry = new ContinuousQuery<>();
 
                 final CountDownLatch latch = new CountDownLatch(1);
+                final CountDownLatch evtFromLsnrLatch = new CountDownLatch(1);
 
                 IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> fltrClsr =
                     new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() {
@@ -381,41 +397,48 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr
 
                 IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> lsnrClsr =
                     new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() {
-                    @Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey,
-                        ? extends QueryTestValue> e) {
-                        IgniteCache<Object, Object> cache0 = ignite.cache(cache.getName());
+                        @Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey,
+                            ? extends QueryTestValue> e) {
+                            IgniteCache<Object, Object> cache0 = ignite.cache(cache.getName());
 
-                        QueryTestValue val = e.getValue();
+                            QueryTestValue val = e.getValue();
 
-                        if (val == null || !val.equals(new QueryTestValue(1)))
-                            return;
+                            if (val == null)
+                                return;
+                            else if (val.equals(newVal)) {
+                                evtFromLsnrLatch.countDown();
 
-                        Transaction tx = null;
+                                return;
+                            }
+                            else if (!val.equals(val0))
+                                return;
 
-                        try {
-                            if (cache0.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL)
-                                tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ);
+                            Transaction tx = null;
 
-                            assertEquals(val, val0);
+                            try {
+                                if (cache0.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL)
+                                    tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ);
 
-                            cache0.put(key, newVal);
+                                assertEquals(val, val0);
 
-                            if (tx != null)
-                                tx.commit();
+                                cache0.put(key, newVal);
 
-                            latch.countDown();
-                        }
-                        catch (Exception exp) {
-                            log.error("Failed: ", exp);
+                                if (tx != null)
+                                    tx.commit();
 
-                            throw new IgniteException(exp);
-                        }
-                        finally {
-                            if (tx != null)
-                                tx.close();
+                                latch.countDown();
+                            }
+                            catch (Exception exp) {
+                                log.error("Failed: ", exp);
+
+                                throw new IgniteException(exp);
+                            }
+                            finally {
+                                if (tx != null)
+                                    tx.close();
+                            }
                         }
-                    }
-                };
+                    };
 
                 if (asyncListener)
                     conQry.setLocalListener(new CacheInvokeListenerAsync(lsnrClsr));
@@ -428,11 +451,23 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr
                     conQry.setRemoteFilterFactory(FactoryBuilder.factoryOf(new CacheTestRemoteFilter(fltrClsr)));
 
                 try (QueryCursor qry = cache.query(conQry)) {
-                    cache.put(key, val0);
+                    if (rnd.nextBoolean())
+                        cache.put(key, val0);
+                    else
+                        cache.invoke(key, new CacheEntryProcessor() {
+                            @Override public Object process(MutableEntry entry, Object... arguments)
+                                throws EntryProcessorException {
+                                entry.setValue(val0);
+
+                                return null;
+                            }
+                        });
 
-                    assert U.await(latch, 3, SECONDS) : "Failed to waiting event.";
+                    assertTrue("Failed to waiting event.", U.await(latch, 3, SECONDS));
 
                     assertEquals(cache.get(key), new QueryTestValue(2));
+
+                    assertTrue("Failed to waiting event from listener.", U.await(latch, 3, SECONDS));
                 }
 
                 log.info("Iteration finished: " + i);
@@ -444,77 +479,270 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testDeadLockInListenerAtomic() throws Exception {
+        testDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED));
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @throws Exception If failed.
+     */
+    private void testDeadLockInListener(CacheConfiguration ccfg) throws Exception {
+        ignite(0).createCache(ccfg);
+
+        final IgniteCache cache = grid(0).cache(ccfg.getName());
+
+        final QueryTestKey key = affinityKey(cache);
+
+        final QueryTestValue val0 = new QueryTestValue(1);
+        final QueryTestValue newVal = new QueryTestValue(2);
+
+        ContinuousQuery<QueryTestKey, QueryTestValue> conQry = new ContinuousQuery<>();
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> lsnrClsr =
+            new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() {
+                @Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey,
+                    ? extends QueryTestValue> e) {
+                    IgniteCache<Object, Object> cache0 = ignite.cache(cache.getName());
+
+                    QueryTestValue val = e.getValue();
+
+                    if (val == null || !val.equals(val0))
+                        return;
+
+                    Transaction tx = null;
+
+                    try {
+                        if (cache0.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL)
+                            tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ);
+
+                        assertEquals(val, val0);
+
+                        latch.countDown();
+
+                        cache0.put(key, newVal);
+
+                        if (tx != null)
+                            tx.commit();
+                    }
+                    catch (Exception exp) {
+                        log.error("Failed: ", exp);
+
+                        throw new IgniteException(exp);
+                    }
+                    finally {
+                        if (tx != null)
+                            tx.close();
+                    }
+                }
+            };
+
+        conQry.setLocalListener(new CacheInvokeListener(lsnrClsr));
+
+        try (QueryCursor qry = cache.query(conQry)) {
+            GridTestUtils.assertThrows(log, new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    IgniteInternalFuture<Void> f = GridTestUtils.runAsync(new Callable<Void>() {
+                        @Override public Void call() throws Exception {
+                            cache.put(key, val0);
+
+                            return null;
+                        }
+                    });
+
+                    f.get(1, SECONDS);
+
+                    return null;
+                }
+            }, IgniteFutureTimeoutCheckedException.class, null);
+
+            assertTrue("Failed. Deadlock early than put happened.", U.await(latch, 3, SECONDS));
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeadLockInFilterAtomic() throws Exception {
+        testDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED));
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @throws Exception If failed.
+     */
+    private void testDeadLockInFilter(CacheConfiguration ccfg) throws Exception {
+        ignite(0).createCache(ccfg);
+
+        final IgniteCache cache = grid(0).cache(ccfg.getName());
+
+        final QueryTestKey key = affinityKey(cache);
+
+        final QueryTestValue val0 = new QueryTestValue(1);
+        final QueryTestValue newVal = new QueryTestValue(2);
+
+        ContinuousQuery<QueryTestKey, QueryTestValue> conQry = new ContinuousQuery<>();
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> fltrClsr =
+            new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() {
+                @Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey,
+                    ? extends QueryTestValue> e) {
+                    IgniteCache<Object, Object> cache0 = ignite.cache(cache.getName());
+
+                    QueryTestValue val = e.getValue();
+
+                    if (val == null || !val.equals(val0))
+                        return;
+
+                    Transaction tx = null;
+
+                    try {
+                        if (cache0.getConfiguration(CacheConfiguration.class)
+                            .getAtomicityMode() == TRANSACTIONAL)
+                            tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ);
+
+                        assertEquals(val, val0);
+
+                        latch.countDown();
+
+                        cache0.put(key, newVal);
+
+                        if (tx != null)
+                            tx.commit();
+                    }
+                    catch (Exception exp) {
+                        log.error("Failed: ", exp);
+
+                        throw new IgniteException(exp);
+                    }
+                    finally {
+                        if (tx != null)
+                            tx.close();
+                    }
+                }
+            };
+
+        conQry.setRemoteFilterFactory(FactoryBuilder.factoryOf(new CacheTestRemoteFilter(fltrClsr)));
+
+        conQry.setLocalListener(new CacheInvokeListener(
+            new CI2<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() {
+                @Override public void apply(Ignite ignite,
+                    CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> event) {
+                    // No-op.
+                }
+            }));
+
+        try (QueryCursor qry = cache.query(conQry)) {
+            GridTestUtils.assertThrows(log, new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    IgniteInternalFuture<Void> f = GridTestUtils.runAsync(new Callable<Void>() {
+                        @Override public Void call() throws Exception {
+                            cache.put(key, val0);
+
+                            return null;
+                        }
+                    });
+
+                    f.get(1, SECONDS);
+
+                    return null;
+                }
+            }, IgniteFutureTimeoutCheckedException.class, null);
+
+            assertTrue("Failed. Deadlock early than put happened.", U.await(latch, 3, SECONDS));
+        }
+    }
+
+    /**
      * @param ccfg Cache configuration.
      * @param asyncFilter Async filter.
      * @param asyncListener Async listener.
      * @throws Exception If failed.
      */
-    public void testNonDeadLockInFilter(CacheConfiguration ccfg,
+    private void testNonDeadLockInFilter(CacheConfiguration ccfg,
         final boolean asyncFilter,
         final boolean asyncListener) throws Exception {
-        final IgniteCache cache = ignite(0).createCache(ccfg);
+        ignite(0).createCache(ccfg);
 
-        try {
-            final QueryTestKey key = affinityKey(cache);
-
-            final QueryTestValue val0 = new QueryTestValue(1);
-            final QueryTestValue newVal = new QueryTestValue(2);
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
+        try {
             for (int i = 0; i < ITERATION_CNT; i++) {
                 log.info("Start iteration: " + i);
 
+                int nodeIdx = i % NODES;
+
+                final IgniteCache cache = grid(nodeIdx).cache(ccfg.getName());
+
+                final QueryTestKey key = NODES - 1 != nodeIdx ? affinityKey(cache) : new QueryTestKey(1);
+
+                final QueryTestValue val0 = new QueryTestValue(1);
+                final QueryTestValue newVal = new QueryTestValue(2);
+
                 ContinuousQuery<QueryTestKey, QueryTestValue> conQry = new ContinuousQuery<>();
 
                 final CountDownLatch latch = new CountDownLatch(1);
+                final CountDownLatch evtFromLsnrLatch = new CountDownLatch(1);
 
                 IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> fltrClsr =
                     new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() {
-                    @Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey,
-                        ? extends QueryTestValue> e) {
-                        if (asyncFilter) {
-                            assertFalse("Failed: " + Thread.currentThread().getName(),
-                                Thread.currentThread().getName().contains("sys-"));
-
-                            assertTrue("Failed: " + Thread.currentThread().getName(),
-                                Thread.currentThread().getName().contains("contQry-"));
-                        }
+                        @Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey,
+                            ? extends QueryTestValue> e) {
+                            if (asyncFilter) {
+                                assertFalse("Failed: " + Thread.currentThread().getName(),
+                                    Thread.currentThread().getName().contains("sys-"));
 
+                                assertTrue("Failed: " + Thread.currentThread().getName(),
+                                    Thread.currentThread().getName().contains("contQry-"));
+                            }
 
+                            IgniteCache<Object, Object> cache0 = ignite.cache(cache.getName());
 
-                        IgniteCache<Object, Object> cache0 = ignite.cache(cache.getName());
+                            QueryTestValue val = e.getValue();
 
-                        QueryTestValue val = e.getValue();
+                            if (val == null)
+                                return;
+                            else if (val.equals(newVal)) {
+                                evtFromLsnrLatch.countDown();
 
-                        if (val == null || !val.equals(new QueryTestValue(1)))
-                            return;
+                                return;
+                            }
+                            else if (!val.equals(val0))
+                                return;
 
-                        Transaction tx = null;
+                            Transaction tx = null;
 
-                        try {
-                            if (cache0.getConfiguration(CacheConfiguration.class)
-                                .getAtomicityMode() == TRANSACTIONAL)
-                                tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ);
+                            try {
+                                if (cache0.getConfiguration(CacheConfiguration.class)
+                                    .getAtomicityMode() == TRANSACTIONAL)
+                                    tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ);
 
-                            assertEquals(val, val0);
+                                assertEquals(val, val0);
 
-                            cache0.put(key, newVal);
+                                cache0.put(key, newVal);
 
-                            if (tx != null)
-                                tx.commit();
+                                if (tx != null)
+                                    tx.commit();
 
-                            latch.countDown();
-                        }
-                        catch (Exception exp) {
-                            log.error("Failed: ", exp);
+                                latch.countDown();
+                            }
+                            catch (Exception exp) {
+                                log.error("Failed: ", exp);
 
-                            throw new IgniteException(exp);
-                        }
-                        finally {
-                            if (tx != null)
-                                tx.close();
+                                throw new IgniteException(exp);
+                            }
+                            finally {
+                                if (tx != null)
+                                    tx.close();
+                            }
                         }
-                    }
-                };
+                    };
 
                 IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> lsnrClsr =
                     new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() {
@@ -550,11 +778,23 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr
                     conQry.setLocalListener(new CacheInvokeListener(lsnrClsr));
 
                 try (QueryCursor qry = cache.query(conQry)) {
-                    cache.put(key, val0);
+                    if (rnd.nextBoolean())
+                        cache.put(key, val0);
+                    else
+                        cache.invoke(key, new CacheEntryProcessor() {
+                            @Override public Object process(MutableEntry entry, Object... arguments)
+                                throws EntryProcessorException {
+                                entry.setValue(val0);
+
+                                return null;
+                            }
+                        });
 
                     assert U.await(latch, 3, SECONDS) : "Failed to waiting event.";
 
                     assertEquals(cache.get(key), new QueryTestValue(2));
+
+                    assertTrue("Failed to waiting event from filter.", U.await(latch, 3, SECONDS));
                 }
 
                 log.info("Iteration finished: " + i);
@@ -584,7 +824,7 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr
 
     /** {@inheritDoc} */
     @Override protected long getTestTimeout() {
-        return TimeUnit.SECONDS.toMillis(10);
+        return TimeUnit.SECONDS.toMillis(15);
     }
 
     /**
@@ -769,7 +1009,7 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr
             if (o == null || getClass() != o.getClass())
                 return false;
 
-            QueryTestValue that = (QueryTestValue) o;
+            QueryTestValue that = (QueryTestValue)o;
 
             return val1.equals(that.val1) && val2.equals(that.val2);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/80b134e6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeadlockTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeadlockTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeadlockTest.java
new file mode 100644
index 0000000..59d5382
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeadlockTest.java
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Serializable;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.CI2;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+public class CacheContinuousQueryDeadlockTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int NODES = 5;
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setClientMode(client);
+
+        MemoryEventStorageSpi storeSpi = new MemoryEventStorageSpi();
+        storeSpi.setExpireCount(1000);
+
+        cfg.setEventStorageSpi(storeSpi);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(NODES - 1);
+
+        client = true;
+
+        startGrid(NODES - 1);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeadLockInListenerAtomic() throws Exception {
+        testDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeadLockInListenerAtomicWithOffheap() throws Exception {
+        testDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeadLockInListenerAtomicWithOffheapValues() throws Exception {
+        testDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_VALUES));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeadLockInListenerReplicatedAtomic() throws Exception {
+        testDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED));
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @throws Exception If failed.
+     */
+    private void testDeadLockInListener(CacheConfiguration ccfg) throws Exception {
+        ignite(0).createCache(ccfg);
+
+        final IgniteCache cache = grid(0).cache(ccfg.getName());
+
+        final QueryTestKey key = affinityKey(cache);
+
+        final QueryTestValue val0 = new QueryTestValue(1);
+        final QueryTestValue newVal = new QueryTestValue(2);
+
+        ContinuousQuery<QueryTestKey, QueryTestValue> conQry = new ContinuousQuery<>();
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> lsnrClsr =
+            new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() {
+                @Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey,
+                    ? extends QueryTestValue> e) {
+                    IgniteCache<Object, Object> cache0 = ignite.cache(cache.getName());
+
+                    QueryTestValue val = e.getValue();
+
+                    if (val == null || !val.equals(val0))
+                        return;
+
+                    Transaction tx = null;
+
+                    try {
+                        if (cache0.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL)
+                            tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ);
+
+                        assertEquals(val, val0);
+
+                        latch.countDown();
+
+                        cache0.put(key, newVal);
+
+                        if (tx != null)
+                            tx.commit();
+                    }
+                    catch (Exception exp) {
+                        log.error("Failed: ", exp);
+
+                        throw new IgniteException(exp);
+                    }
+                }
+            };
+
+        conQry.setLocalListener(new CacheInvokeListener(lsnrClsr));
+
+        try (QueryCursor qry = cache.query(conQry)) {
+            GridTestUtils.assertThrows(log, new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    IgniteInternalFuture<Void> f = GridTestUtils.runAsync(new Callable<Void>() {
+                        @Override public Void call() throws Exception {
+                            cache.put(key, val0);
+
+                            return null;
+                        }
+                    });
+
+                    f.get(1, SECONDS);
+
+                    return null;
+                }
+            }, IgniteFutureTimeoutCheckedException.class, null);
+
+            assertTrue("Failed. Deadlock early than put happened.", U.await(latch, 3, SECONDS));
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeadLockInFilterAtomic() throws Exception {
+        testDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeadLockInFilterAtomicOffheapValues() throws Exception {
+        testDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_VALUES));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeadLockInFilterReplicated() throws Exception {
+        testDeadLockInFilter(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED));
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @throws Exception If failed.
+     */
+    private void testDeadLockInFilter(CacheConfiguration ccfg) throws Exception {
+        ignite(0).createCache(ccfg);
+
+        final IgniteCache cache = grid(0).cache(ccfg.getName());
+
+        final QueryTestKey key = affinityKey(cache);
+
+        final QueryTestValue val0 = new QueryTestValue(1);
+        final QueryTestValue newVal = new QueryTestValue(2);
+
+        ContinuousQuery<QueryTestKey, QueryTestValue> conQry = new ContinuousQuery<>();
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> fltrClsr =
+            new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() {
+                @Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey,
+                    ? extends QueryTestValue> e) {
+                    IgniteCache<Object, Object> cache0 = ignite.cache(cache.getName());
+
+                    QueryTestValue val = e.getValue();
+
+                    if (val == null || !val.equals(val0))
+                        return;
+
+                    Transaction tx = null;
+
+                    try {
+                        if (cache0.getConfiguration(CacheConfiguration.class)
+                            .getAtomicityMode() == TRANSACTIONAL)
+                            tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ);
+
+                        assertEquals(val, val0);
+
+                        latch.countDown();
+
+                        cache0.put(key, newVal);
+
+                        if (tx != null)
+                            tx.commit();
+                    }
+                    catch (Exception exp) {
+                        log.error("Failed: ", exp);
+
+                        throw new IgniteException(exp);
+                    }
+                    finally {
+                        if (tx != null)
+                            tx.close();
+                    }
+                }
+            };
+
+        conQry.setRemoteFilterFactory(FactoryBuilder.factoryOf(new CacheTestRemoteFilter(fltrClsr)));
+
+        conQry.setLocalListener(new CacheInvokeListener(
+            new CI2<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() {
+                @Override public void apply(Ignite ignite,
+                    CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> event) {
+                    // No-op.
+                }
+            }));
+
+        try (QueryCursor qry = cache.query(conQry)) {
+            GridTestUtils.assertThrows(log, new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    IgniteInternalFuture<Void> f = GridTestUtils.runAsync(new Callable<Void>() {
+                        @Override public Void call() throws Exception {
+                            cache.put(key, val0);
+
+                            return null;
+                        }
+                    });
+
+                    f.get(1, SECONDS);
+
+                    return null;
+                }
+            }, IgniteFutureTimeoutCheckedException.class, null);
+
+            assertTrue("Failed. Deadlock early than put happened.", U.await(latch, 3, SECONDS));
+        }
+    }
+
+    /**
+     * @param cache Ignite cache.
+     * @return Key.
+     */
+    private QueryTestKey affinityKey(IgniteCache cache) {
+        Affinity aff = affinity(cache);
+
+        for (int i = 0; i < 10_000; i++) {
+            QueryTestKey key = new QueryTestKey(i);
+
+            if (aff.isPrimary(localNode(cache), key))
+                return key;
+        }
+
+        throw new IgniteException("Failed to found primary key.");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return TimeUnit.SECONDS.toMillis(30);
+    }
+
+    /**
+     *
+     */
+    private static class CacheTestRemoteFilter implements
+        CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue> {
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** */
+        private IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> clsr;
+
+        /**
+         * @param clsr Closure.
+         */
+        public CacheTestRemoteFilter(IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey,
+            ? extends QueryTestValue>> clsr) {
+            this.clsr = clsr;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e)
+            throws CacheEntryListenerException {
+            clsr.apply(ignite, e);
+
+            return true;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class CacheInvokeListener implements CacheEntryUpdatedListener<QueryTestKey, QueryTestValue> {
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** */
+        private IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> clsr;
+
+        /**
+         * @param clsr Closure.
+         */
+        public CacheInvokeListener(IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey,
+            ? extends QueryTestValue>> clsr) {
+            this.clsr = clsr;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey,
+            ? extends QueryTestValue>> events)
+            throws CacheEntryListenerException {
+            for (CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e : events)
+                clsr.apply(ignite, e);
+        }
+    }
+
+    /**
+     * @param cacheMode Cache mode.
+     * @param backups Number of backups.
+     * @param atomicityMode Cache atomicity mode.
+     * @param memoryMode Cache memory mode.
+     * @return Cache configuration.
+     */
+    protected CacheConfiguration<Object, Object> cacheConfiguration(
+        CacheMode cacheMode,
+        int backups,
+        CacheAtomicityMode atomicityMode,
+        CacheMemoryMode memoryMode) {
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setName("test-cache-" + atomicityMode + "-" + cacheMode + "-" + memoryMode + "-" + backups);
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setCacheMode(cacheMode);
+        ccfg.setMemoryMode(memoryMode);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicWriteOrderMode(PRIMARY);
+
+        if (cacheMode == PARTITIONED)
+            ccfg.setBackups(backups);
+
+        return ccfg;
+    }
+
+    /**
+     *
+     */
+    public static class QueryTestKey implements Serializable, Comparable {
+        /** */
+        private final Integer key;
+
+        /**
+         * @param key Key.
+         */
+        public QueryTestKey(Integer key) {
+            this.key = key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            QueryTestKey that = (QueryTestKey)o;
+
+            return key.equals(that.key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return key.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(QueryTestKey.class, this);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int compareTo(Object o) {
+            return key - ((QueryTestKey)o).key;
+        }
+    }
+
+    /**
+     *
+     */
+    public static class QueryTestValue implements Serializable {
+        /** */
+        @GridToStringInclude
+        protected final Integer val1;
+
+        /** */
+        @GridToStringInclude
+        protected final String val2;
+
+        /**
+         * @param val Value.
+         */
+        public QueryTestValue(Integer val) {
+            this.val1 = val;
+            this.val2 = String.valueOf(val);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            QueryTestValue that = (QueryTestValue)o;
+
+            return val1.equals(that.val1) && val2.equals(that.val2);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = val1.hashCode();
+
+            res = 31 * res + val2.hashCode();
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(QueryTestValue.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/80b134e6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java
index ec6ed4a..9800b56 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java
@@ -266,7 +266,7 @@ public class CacheContinuousQueryFactoryFilterRandomOperationTest extends CacheC
         Map<Integer, Long> partCntr,
         IgniteCache<Object, Object> cache)
         throws Exception {
-        Object key = rnd.nextInt(KEYS);
+        Object key = new QueryTestKey(rnd.nextInt(KEYS));
         Object newVal = value(rnd);
         Object oldVal = expData.get(key);
 
@@ -280,7 +280,7 @@ public class CacheContinuousQueryFactoryFilterRandomOperationTest extends CacheC
             tx = ignite.transactions().txStart(txRandomConcurrency(rnd), txRandomIsolation(rnd));
 
         try {
-            log.info("Random operation [key=" + key + ", op=" + op + ']');
+            // log.info("Random operation [key=" + key + ", op=" + op + ']');
 
             switch (op) {
                 case 0: {

http://git-wip-us.apache.org/repos/asf/ignite/blob/80b134e6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java
index c42533e..e728b91 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java
@@ -24,9 +24,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.configuration.FactoryBuilder;
 import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryListenerException;
 import javax.cache.event.CacheEntryUpdatedListener;
@@ -39,7 +41,6 @@ import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cache.CacheEntryProcessor;
 import org.apache.ignite.cache.CacheMemoryMode;
 import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.lang.IgniteAsyncCallback;
 import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -48,7 +49,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteAsyncCallback;
 import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -68,7 +69,7 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
  */
 public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTest {
     /** */
-    public static final int LISTENER_CNT = 20;
+    public static final int LISTENER_CNT = 3;
 
     /** */
     public static final int KEYS = 10;
@@ -85,6 +86,9 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes
     /** */
     private boolean client;
 
+    /** */
+    private static volatile boolean fail;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -94,7 +98,7 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes
         cfg.setClientMode(client);
 
         MemoryEventStorageSpi storeSpi = new MemoryEventStorageSpi();
-        storeSpi.setExpireCount(1000);
+        storeSpi.setExpireCount(100);
 
         cfg.setEventStorageSpi(storeSpi);
 
@@ -119,6 +123,13 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes
         super.afterTestsStopped();
     }
 
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        fail = false;
+    }
+
     /**
      * @throws Exception If failed.
      */
@@ -152,7 +163,7 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes
     /**
      * @throws Exception If failed.
      */
-    public void testAtomicReplicared() throws Exception {
+    public void testAtomicReplicated() throws Exception {
         CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0, CacheAtomicityMode.ATOMIC,
             CacheMemoryMode.ONHEAP_TIERED);
 
@@ -162,7 +173,7 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes
     /**
      * @throws Exception If failed.
      */
-    public void testAtomicReplicaredOffheap() throws Exception {
+    public void testAtomicReplicatedOffheap() throws Exception {
         CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0, CacheAtomicityMode.ATOMIC,
             CacheMemoryMode.OFFHEAP_TIERED);
 
@@ -199,6 +210,88 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes
         doOrderingTest(ccfg, false);
     }
 
+    // ASYNC
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOnheapTwoBackupAsync() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, CacheAtomicityMode.ATOMIC,
+            CacheMemoryMode.ONHEAP_TIERED);
+
+        doOrderingTest(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOffheapTwoBackupAsync() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, CacheAtomicityMode.ATOMIC,
+            CacheMemoryMode.OFFHEAP_TIERED);
+
+        doOrderingTest(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOffheapValuesTwoBackupAsync() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, CacheAtomicityMode.ATOMIC,
+            CacheMemoryMode.OFFHEAP_VALUES);
+
+        doOrderingTest(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicReplicatedAsync() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0, CacheAtomicityMode.ATOMIC,
+            CacheMemoryMode.ONHEAP_TIERED);
+
+        doOrderingTest(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicReplicatedOffheapAsync() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0, CacheAtomicityMode.ATOMIC,
+            CacheMemoryMode.OFFHEAP_TIERED);
+
+        doOrderingTest(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOnheapWithoutBackupAsync() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 0, CacheAtomicityMode.ATOMIC,
+            CacheMemoryMode.ONHEAP_TIERED);
+
+        doOrderingTest(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOnheapTwoBackupAsync() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, CacheAtomicityMode.TRANSACTIONAL,
+            CacheMemoryMode.ONHEAP_TIERED);
+
+        doOrderingTest(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOnheapAsync() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 0, CacheAtomicityMode.TRANSACTIONAL,
+            CacheMemoryMode.ONHEAP_TIERED);
+
+        doOrderingTest(ccfg, true);
+    }
+
     /**
      * @param ccfg Cache configuration.
      * @param async Async filter.
@@ -214,28 +307,40 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes
 
         try {
             List<BlockingQueue<CacheEntryEvent<QueryTestKey, QueryTestValue>>> rcvdEvts =
-                new ArrayList<>(LISTENER_CNT);
+                new ArrayList<>(LISTENER_CNT * NODES);
 
             final AtomicInteger qryCntr = new AtomicInteger(0);
 
-            final int threadCnt = LISTENER_CNT;
+            final int threadCnt = 20;
 
-            for (int i = 0; i < LISTENER_CNT; i++) {
-                BlockingQueue<CacheEntryEvent<QueryTestKey, QueryTestValue>> queue =
-                    new ArrayBlockingQueue<>(ITERATION_CNT * threadCnt);
+            for (int idx = 0; idx < NODES; idx++) {
+                for (int i = 0; i < LISTENER_CNT; i++) {
+                    BlockingQueue<CacheEntryEvent<QueryTestKey, QueryTestValue>> queue =
+                        new ArrayBlockingQueue<>(ITERATION_CNT * threadCnt);
 
-                ContinuousQuery qry = new ContinuousQuery();
+                    ContinuousQuery qry = new ContinuousQuery();
 
-                qry.setLocalListener(new TestCacheEventListener(queue, qryCntr));
+                    if (async) {
+                        qry.setLocalListener(new TestCacheAsyncEventListener(queue, qryCntr));
 
-                rcvdEvts.add(queue);
+                        qry.setRemoteFilterFactory(FactoryBuilder.factoryOf(
+                            new CacheTestRemoteFilterAsync(ccfg.getName())));
+                    }
+                    else {
+                        qry.setLocalListener(new TestCacheEventListener(queue, qryCntr));
 
-                IgniteCache<Object, Object> cache =
-                    grid(ThreadLocalRandom.current().nextInt(NODES)).cache(ccfg.getName());
+                        qry.setRemoteFilterFactory(FactoryBuilder.factoryOf(
+                            new CacheTestRemoteFilter(ccfg.getName())));
+                    }
+
+                    rcvdEvts.add(queue);
+
+                    IgniteCache<Object, Object> cache = grid(idx).cache(ccfg.getName());
 
-                QueryCursor qryCursor = cache.query(qry);
+                    QueryCursor qryCursor = cache.query(qry);
 
-                qries.add(qryCursor);
+                    qries.add(qryCursor);
+                }
             }
 
             IgniteInternalFuture<Long> f = GridTestUtils.runMultiThreadedAsync(new Runnable() {
@@ -297,12 +402,14 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes
 
             GridTestUtils.waitForCondition(new PA() {
                 @Override public boolean apply() {
-                    return qryCntr.get() >= ITERATION_CNT * threadCnt * LISTENER_CNT;
+                    return qryCntr.get() >= ITERATION_CNT * threadCnt * LISTENER_CNT * NODES;
                 }
             }, 1000L);
 
             for (BlockingQueue<CacheEntryEvent<QueryTestKey, QueryTestValue>> queue : rcvdEvts)
                 checkEvents(queue, ITERATION_CNT * threadCnt);
+
+            assertFalse("Ordering invocations of filter broken.", fail);
         }
         finally {
             for (QueryCursor<?> qry : qries)
@@ -332,7 +439,7 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes
                 assertEquals(new QueryTestValue(0), evt.getValue());
             else {
                 if (!new QueryTestValue(preVal + 1).equals(evt.getValue()))
-                    assertEquals(new QueryTestValue(preVal + 1), evt.getValue());
+                    assertEquals("Key event: " + evt.getKey(), new QueryTestValue(preVal + 1), evt.getValue());
             }
 
             vals.put(evt.getKey(), evt.getValue().val1);
@@ -354,11 +461,10 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes
     @IgniteAsyncCallback
     private static class CacheTestRemoteFilterAsync extends CacheTestRemoteFilter {
         /**
-         * @param clsr Closure.
+         * @param cacheName Cache name.
          */
-        public CacheTestRemoteFilterAsync(
-            IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> clsr) {
-            super(clsr);
+        public CacheTestRemoteFilterAsync(String cacheName) {
+            super(cacheName);
         }
     }
 
@@ -368,24 +474,33 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes
     private static class CacheTestRemoteFilter implements
         CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue> {
         /** */
+        private Map<QueryTestKey, QueryTestValue> prevVals = new ConcurrentHashMap<>();
+
+        /** */
         @IgniteInstanceResource
         private Ignite ignite;
 
         /** */
-        private IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> clsr;
+        private String cacheName;
 
         /**
-         * @param clsr Closure.
+         * @param cacheName Cache name.
          */
-        public CacheTestRemoteFilter(IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey,
-            ? extends QueryTestValue>> clsr) {
-            this.clsr = clsr;
+        public CacheTestRemoteFilter(String cacheName) {
+            this.cacheName = cacheName;
         }
 
         /** {@inheritDoc} */
         @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e)
             throws CacheEntryListenerException {
-            clsr.apply(ignite, e);
+            if (affinity(ignite.cache(cacheName)).isPrimary(ignite.cluster().localNode(), e.getKey())) {
+                QueryTestValue prevVal = prevVals.put(e.getKey(), e.getValue());
+
+                if (prevVal != null) {
+                    if (!new QueryTestValue(prevVal.val1 + 1).equals(e.getValue()))
+                        fail = true;
+                }
+            }
 
             return true;
         }
@@ -395,12 +510,12 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes
      *
      */
     @IgniteAsyncCallback
-    private static class TestCacheEventListenerAsync extends TestCacheEventListener {
+    private static class TestCacheAsyncEventListener extends TestCacheEventListener {
         /**
          * @param queue Queue.
          * @param cntr Received events counter.
          */
-        public TestCacheEventListenerAsync(BlockingQueue<CacheEntryEvent<QueryTestKey, QueryTestValue>> queue,
+        public TestCacheAsyncEventListener(BlockingQueue<CacheEntryEvent<QueryTestKey, QueryTestValue>> queue,
             AtomicInteger cntr) {
             super(queue, cntr);
         }
@@ -427,13 +542,27 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes
         }
 
         /** {@inheritDoc} */
-        @Override public synchronized void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey,
+        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey,
             ? extends QueryTestValue>> events)
             throws CacheEntryListenerException {
+            Integer prevVal = null;
+
             for (CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e : events) {
+                if (prevVal == null)
+                    prevVal = e.getValue().val1;
+
                 queue.add((CacheEntryEvent<QueryTestKey, QueryTestValue>)e);
 
                 cntr.incrementAndGet();
+
+                if (prevVal > e.getValue().val1) {
+                    int z = 0;
+
+                    ++z;
+                }
+                else
+                    prevVal = e.getValue().val1;
+
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/80b134e6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
index fc101d4..dbe282e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
@@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import javax.cache.Cache;
 import javax.cache.configuration.Factory;
 import javax.cache.event.CacheEntryEvent;
-import javax.cache.event.CacheEntryListenerException;
 import javax.cache.event.CacheEntryUpdatedListener;
 import javax.cache.integration.CacheWriterException;
 import org.apache.ignite.Ignite;
@@ -312,13 +311,6 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
             }
         });
 
-        qry.setRemoteFilter(new CacheEntryEventSerializableFilter<Integer, Integer>() {
-            @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> event)
-                throws CacheEntryListenerException {
-                return true;
-            }
-        });
-
         try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) {
             cache.put(1, 1);
             cache.put(2, 2);

http://git-wip-us.apache.org/repos/asf/ignite/blob/80b134e6/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
index 912c8f9..b7d9daf 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
@@ -21,9 +21,11 @@ import junit.framework.TestSuite;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchAckTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchForceServerModeAckTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryAsyncFilterListenerTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryDeadlockTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryAsyncFilterRandomOperationTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterRandomOperationTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryOperationP2PTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryOrderingEventTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTwoNodesTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicNearEnabledSelfTest;
@@ -91,7 +93,9 @@ public class IgniteCacheQuerySelfTestSuite3 extends TestSuite {
         suite.addTestSuite(GridCacheContinuousQueryConcurrentTest.class);
         suite.addTestSuite(CacheContinuousQueryAsyncFilterListenerTest.class);
         suite.addTestSuite(CacheContinuousQueryFactoryFilterRandomOperationTest.class);
-        suite.addTestSuite(CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.class);;
+        suite.addTestSuite(CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.class);
+        suite.addTestSuite(CacheContinuousQueryDeadlockTest.class);
+        suite.addTestSuite(CacheContinuousQueryOrderingEventTest.class);
         suite.addTestSuite(CacheContinuousQueryOperationP2PTest.class);
         suite.addTestSuite(CacheContinuousBatchAckTest.class);
         suite.addTestSuite(CacheContinuousBatchForceServerModeAckTest.class);


Mime
View raw message