ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject [3/4] ignite git commit: IGNITE-2004 Fixed "Asynchronous execution of ContinuousQuery's remote filter & local list".
Date Fri, 22 Apr 2016 15:42:49 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index c01f636..9efc456 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -54,11 +54,13 @@ import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
 import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteAsyncCallback;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.security.SecurityPermission;
@@ -166,23 +168,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
      * @param key Entry key.
      * @param partId Partition id.
      * @param updCntr Updated counter.
-     * @param topVer Topology version.
-     */
-    public void skipUpdateEvent(Map<UUID, CacheContinuousQueryListener> lsnrs,
-        KeyCacheObject key,
-        int partId,
-        long updCntr,
-        AffinityTopologyVersion topVer) {
-        skipUpdateEvent(lsnrs, key, partId, updCntr, true, topVer);
-    }
-
-    /**
-     * @param lsnrs Listeners to notify.
-     * @param key Entry key.
-     * @param partId Partition id.
-     * @param updCntr Updated counter.
-     * @param topVer Topology version.
      * @param primary Primary.
+     * @param topVer Topology version.
      */
     public void skipUpdateEvent(Map<UUID, CacheContinuousQueryListener> lsnrs,
         KeyCacheObject key,
@@ -241,6 +228,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
      * @param primary {@code True} if called on primary node.
      * @param preload Whether update happened during preloading.
      * @param updateCntr Update counter.
+     * @param fut Dht atomic future.
      * @param topVer Topology version.
      * @throws IgniteCheckedException In case of error.
      */
@@ -253,7 +241,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         boolean primary,
         boolean preload,
         long updateCntr,
-        AffinityTopologyVersion topVer) throws IgniteCheckedException {
+        @Nullable GridDhtAtomicUpdateFuture fut,
+        AffinityTopologyVersion topVer
+    ) throws IgniteCheckedException {
         Map<UUID, CacheContinuousQueryListener> lsnrCol = updateListeners(internal, preload);
 
         if (lsnrCol != null) {
@@ -267,6 +257,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                 primary,
                 preload,
                 updateCntr,
+                fut,
                 topVer);
         }
     }
@@ -282,6 +273,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
      * @param preload Whether update happened during preloading.
      * @param updateCntr Update counter.
      * @param topVer Topology version.
+     * @param fut Dht atomic future.
      * @throws IgniteCheckedException In case of error.
      */
     public void onEntryUpdated(
@@ -294,6 +286,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         boolean primary,
         boolean preload,
         long updateCntr,
+        @Nullable GridDhtAtomicUpdateFuture fut,
         AffinityTopologyVersion topVer)
         throws IgniteCheckedException
     {
@@ -347,7 +340,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
             CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
                 cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
 
-            lsnr.onEntryUpdated(evt, primary, recordIgniteEvt);
+            lsnr.onEntryUpdated(evt, primary, recordIgniteEvt, fut);
         }
     }
 
@@ -401,7 +394,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                 CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent(
                     cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
 
-                lsnr.onEntryUpdated(evt, primary, recordIgniteEvt);
+                lsnr.onEntryUpdated(evt, primary, recordIgniteEvt, null);
             }
         }
     }
@@ -511,7 +504,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         return executeQuery0(
             locLsnr,
             new IgniteClosure<Boolean, CacheContinuousQueryHandler>() {
-                @Override public CacheContinuousQueryHandler apply(Boolean aBoolean) {
+                @Override public CacheContinuousQueryHandler apply(Boolean v2) {
                     return new CacheContinuousQueryHandler(
                         cctx.name(),
                         TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
@@ -800,6 +793,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     }
 
     /**
+     *
      */
     private class JCacheQuery {
         /** */
@@ -931,9 +925,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     /**
      *
      */
-    private static class JCacheQueryLocalListener<K, V> implements CacheEntryUpdatedListener<K, V> {
+    static class JCacheQueryLocalListener<K, V> implements CacheEntryUpdatedListener<K, V> {
         /** */
-        private final CacheEntryListener<K, V> impl;
+        final CacheEntryListener<K, V> impl;
 
         /** */
         private final IgniteLogger log;
@@ -957,28 +951,28 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                 try {
                     switch (evt.getEventType()) {
                         case CREATED:
-                            assert impl instanceof CacheEntryCreatedListener;
+                            assert impl instanceof CacheEntryCreatedListener : evt;
 
                             ((CacheEntryCreatedListener<K, V>)impl).onCreated(singleton(evt));
 
                             break;
 
                         case UPDATED:
-                            assert impl instanceof CacheEntryUpdatedListener;
+                            assert impl instanceof CacheEntryUpdatedListener : evt;
 
                             ((CacheEntryUpdatedListener<K, V>)impl).onUpdated(singleton(evt));
 
                             break;
 
                         case REMOVED:
-                            assert impl instanceof CacheEntryRemovedListener;
+                            assert impl instanceof CacheEntryRemovedListener : evt;
 
                             ((CacheEntryRemovedListener<K, V>)impl).onRemoved(singleton(evt));
 
                             break;
 
                         case EXPIRED:
-                            assert impl instanceof CacheEntryExpiredListener;
+                            assert impl instanceof CacheEntryExpiredListener : evt;
 
                             ((CacheEntryExpiredListener<K, V>)impl).onExpired(singleton(evt));
 
@@ -1009,6 +1003,13 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
 
             return evts;
         }
+
+        /**
+         * @return {@code True} if listener should be executed in non-system thread.
+         */
+        protected boolean async() {
+            return U.hasAnnotation(impl, IgniteAsyncCallback.class);
+        }
     }
 
     /**
@@ -1019,7 +1020,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         private static final long serialVersionUID = 0L;
 
         /** */
-        private CacheEntryEventFilter impl;
+        protected CacheEntryEventFilter impl;
 
         /** */
         private byte types;
@@ -1072,6 +1073,13 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         }
 
         /**
+         * @return {@code True} if filter should be executed in non-system thread.
+         */
+        protected boolean async() {
+            return U.hasAnnotation(impl, IgniteAsyncCallback.class);
+        }
+
+        /**
          * @param evtType Type.
          * @return Flag value.
          */

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java
new file mode 100644
index 0000000..1e04ce6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.lang;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryListener;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.configuration.IgniteConfiguration;
+
+/**
+ * If callback has this annotation then it will be executing in another thread.
+ * <p>
+ * Currently this annotation is supported for:
+ * <ol>
+ *     <li>{@link ContinuousQuery} - {@link CacheEntryUpdatedListener} and {@link CacheEntryEventFilter}.</li>
+ * </ol>
+ * <p>
+ * For example, if {@link CacheEntryEventFilter filter} or {@link CacheEntryListener}
+ * has the annotation then callbacks will be executing to asyncCallback thread pool. It allows to use cache API
+ * in a callbacks. This thread pool can be configured by {@link IgniteConfiguration#setAsyncCallbackPoolSize(int)}.
+ * <h1 class="header">Example</h1>
+ * As an example, suppose we have cache with {@code 'Person'} objects and we need
+ * to query all persons with salary above then 1000. Also remote filter will update some entries.
+ * <p>
+ * Here is the {@code Person} class:
+ * <pre name="code" class="java">
+ * public class Person {
+ *     // Name.
+ *     private String name;
+ *
+ *     // Salary.
+ *     private double salary;
+ *
+ *     ...
+ * }
+ * </pre>
+ * <p>
+ * Here is the {@code ExampleCacheEntryFilter} class:
+ * <pre name="code" class="java">
+ * &#064;IgniteAsyncCallback
+ * public class ExampleCacheEntryFilter implements CacheEntryEventFilter&lt;Integer, Person&gt; {
+ *     &#064;IgniteInstanceResource
+ *     private Ignite ignite;
+ *
+ *     // Continuous listener will be notified for persons with salary above 1000.
+ *     // Filter increases salary for some person on 100. Without &#064;IgniteAsyncCallback annotation
+ *     // this operation is not safe.
+ *     public boolean evaluate(CacheEntryEvent&lt;? extends K, ? extends V&gt; evt) throws CacheEntryListenerException {
+ *         Person p = evt.getValue();
+ *
+ *         if (p.getSalary() &gt; 1000)
+ *             return true;
+ *
+ *         ignite.cache("Person").put(evt.getKey(), new Person(p.getName(), p.getSalary() + 100));
+ *
+ *         return false;
+ *     }
+ * }
+ * </pre>
+ * <p>
+ * Query with asynchronous callback execute as usually:
+ * <pre name="code" class="java">
+ * // Create new continuous query.
+ * ContinuousQuery&lt;Long, Person&gt; qry = new ContinuousQuery&lt;&gt;();
+ *
+ * // Callback that is called locally when update notifications are received.
+ * // It simply prints out information about all created persons.
+ * qry.setLocalListener((evts) -> {
+ *     for (CacheEntryEvent&lt;? extends Long, ? extends Person&gt; e : evts) {
+ *         Person p = e.getValue();
+ *
+ *         System.out.println(p.getFirstName() + " " + p.getLastName() + "'s salary is " + p.getSalary());
+ *     }
+ * });
+ *
+ * // Sets remote filter.
+ * qry.setRemoteFilterFactory(() -> new ExampleCacheEntryFilter());
+ *
+ * // Execute query.
+ * QueryCursor&lt;Cache.Entry&lt;Long, Person&gt;&gt; cur = cache.query(qry);
+ * </pre>
+ *
+ * @see IgniteConfiguration#getAsyncCallbackPoolSize
+ * @see ContinuousQuery#getRemoteFilterFactory()
+ * @see ContinuousQuery#getLocalListener()
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface IgniteAsyncCallback {
+    // No-op.
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
index 35882b9..9f7c381 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
@@ -17,62 +17,63 @@
 
 package org.apache.ignite.thread;
 
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.LinkedList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * An {@link ExecutorService} that executes submitted tasks using pooled grid threads.
  */
 public class IgniteStripedThreadPoolExecutor implements ExecutorService {
     /** */
-    public static final int DFLT_SEG_POOL_SIZE = 8;
-
-    /** */
-    public static final int DFLT_CONCUR_LVL = 16;
-
-    /** */
     private final ExecutorService[] execs;
 
-    /** */
-    private final int segShift;
-
-    /** */
-    private final int segMask;
-
     /**
+     * Create striped thread pool.
      *
+     * @param concurrentLvl Concurrency level.
+     * @param gridName Node name.
+     * @param threadNamePrefix Thread name prefix.
      */
-    public IgniteStripedThreadPoolExecutor() {
-        execs = new ExecutorService[DFLT_CONCUR_LVL];
-
-        ThreadFactory factory = new IgniteThreadFactory(null);
-
-        for (int i = 0; i < DFLT_CONCUR_LVL; i++)
-            execs[i] = Executors.newFixedThreadPool(DFLT_SEG_POOL_SIZE, factory);
+    public IgniteStripedThreadPoolExecutor(int concurrentLvl, String gridName, String threadNamePrefix) {
+        execs = new ExecutorService[concurrentLvl];
 
-        // Find power-of-two sizes best matching arguments
-        int sshift = 0;
-        int ssize = 1;
+        ThreadFactory factory = new IgniteThreadFactory(gridName, threadNamePrefix);
 
-        while (ssize < DFLT_CONCUR_LVL) {
-            ++sshift;
-
-            ssize <<= 1;
-        }
+        for (int i = 0; i < concurrentLvl; i++)
+            execs[i] = Executors.newSingleThreadExecutor(factory);
+    }
 
-        segShift = 32 - sshift;
-        segMask = ssize - 1;
+    /**
+     * Executes the given command at some time in the future. The command with the same {@code index}
+     * will be executed in the same thread.
+     *
+     * @param task the runnable task
+     * @param idx Striped index.
+     * @throws RejectedExecutionException if this task cannot be
+     * accepted for execution.
+     * @throws NullPointerException If command is null
+     */
+    public void execute(Runnable task, int idx) {
+        execs[threadId(idx)].execute(task);
+    }
 
+    /**
+     * @param idx Index.
+     * @return Stripped thread ID.
+     */
+    public int threadId(int idx) {
+        return idx < execs.length ? idx : idx % execs.length;
     }
 
     /** {@inheritDoc} */
@@ -83,7 +84,10 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService {
 
     /** {@inheritDoc} */
     @Override public List<Runnable> shutdownNow() {
-        List<Runnable> res = new LinkedList<>();
+        if (execs.length == 0)
+            return Collections.emptyList();
+
+        List<Runnable> res = new ArrayList<>(execs.length);
 
         for (ExecutorService exec : execs) {
             for (Runnable r : exec.shutdownNow())
@@ -124,105 +128,45 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService {
     }
 
     /** {@inheritDoc} */
-    @Override public <T> Future<T> submit(Callable<T> task) {
-        return execForTask(task).submit(task);
+    @NotNull @Override public <T> Future<T> submit(Callable<T> task) {
+        throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
-    @Override public <T> Future<T> submit(Runnable task, T result) {
-        return execForTask(task).submit(task, result);
+    @NotNull @Override public <T> Future<T> submit(Runnable task, T res) {
+        throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
-    @Override public Future<?> submit(Runnable task) {
-        return execForTask(task).submit(task);
+    @NotNull @Override public Future<?> submit(Runnable task) {
+        throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
-    @Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
-        throws InterruptedException {
-        List<Future<T>> futs = new LinkedList<>();
-
-        for (Callable<T> task : tasks)
-            futs.add(execForTask(task).submit(task));
-
-        boolean done = false;
-
-        try {
-            for (Future<T> fut : futs) {
-                try {
-                    fut.get();
-                }
-                catch (ExecutionException | InterruptedException ignored) {
-                    // No-op.
-                }
-            }
-
-            done = true;
-
-            return futs;
-        }
-        finally {
-            if (!done) {
-                for (Future<T> fut : futs)
-                    fut.cancel(true);
-            }
-        }
+    @NotNull @Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
+        throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
-    @Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout,
-        TimeUnit unit) throws InterruptedException {
-        throw new RuntimeException("Not implemented.");
+    @NotNull @Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
+        long timeout,
+        TimeUnit unit) {
+        throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
-    @Override public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException,
-        ExecutionException {
-        throw new RuntimeException("Not implemented.");
+    @NotNull @Override public <T> T invokeAny(Collection<? extends Callable<T>> tasks) {
+        throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
-    @Override public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
-        throws InterruptedException, ExecutionException, TimeoutException {
-        throw new RuntimeException("Not implemented.");
+    @Override public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) {
+        throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
     @Override public void execute(Runnable cmd) {
-        execForTask(cmd).execute(cmd);
-    }
-
-    /**
-     * Applies a supplemental hash function to a given hashCode, which
-     * defends against poor quality hash functions.  This is critical
-     * because ConcurrentHashMap uses power-of-two length hash tables,
-     * that otherwise encounter collisions for hashCodes that do not
-     * differ in lower or upper bits.
-     *
-     * @param h Hash code.
-     * @return Enhanced hash code.
-     */
-    private int hash(int h) {
-        // Spread bits to regularize both segment and index locations,
-        // using variant of single-word Wang/Jenkins hash.
-        h += (h <<  15) ^ 0xffffcd7d;
-        h ^= (h >>> 10);
-        h += (h <<   3);
-        h ^= (h >>>  6);
-        h += (h <<   2) + (h << 14);
-        return h ^ (h >>> 16);
-    }
-
-    /**
-     * @param cmd Command.
-     * @return Service.
-     */
-    private <T> ExecutorService execForTask(T cmd) {
-        assert cmd != null;
-
-        //return execs[ThreadLocalRandom8.current().nextInt(DFLT_CONCUR_LVL)];
-        return execs[(hash(cmd.hashCode()) >>> segShift) & segMask];
+        throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index 6e404b4..a1153cd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -26,6 +26,7 @@ import javax.cache.processor.EntryProcessorResult;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.eviction.EvictableEntry;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -540,7 +541,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
         UUID subjId,
         String taskName,
         @Nullable CacheObject prevVal,
-        @Nullable Long updateCntr) throws IgniteCheckedException,
+        @Nullable Long updateCntr,
+        @Nullable GridDhtAtomicUpdateFuture fut) throws IgniteCheckedException,
         GridCacheEntryRemovedException {
         assert false;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest.java
new file mode 100644
index 0000000..62fd984
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+
+/**
+ *
+ */
+public class CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest
+    extends CacheContinuousQueryFailoverAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicWriteOrderMode writeOrderMode() {
+        return PRIMARY;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean asyncCallback() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest.java
new file mode 100644
index 0000000..4460498
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/**
+ *
+ */
+public class CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest extends CacheContinuousQueryFailoverTxSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return REPLICATED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean asyncCallback() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxSelfTest.java
new file mode 100644
index 0000000..8f0bd0e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxSelfTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/**
+ *
+ */
+public class CacheContinuousQueryAsyncFailoverTxSelfTest extends CacheContinuousQueryFailoverAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return TRANSACTIONAL;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean asyncCallback() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java
new file mode 100644
index 0000000..0605bc8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java
@@ -0,0 +1,986 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Serializable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
+import javax.cache.event.CacheEntryCreatedListener;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteAsyncCallback;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int NODES = 5;
+
+    /** */
+    public static final int ITERATION_CNT = 100;
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setClientMode(client);
+
+        MemoryEventStorageSpi storeSpi = new MemoryEventStorageSpi();
+        storeSpi.setExpireCount(1000);
+
+        cfg.setEventStorageSpi(storeSpi);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(NODES - 1);
+
+        client = true;
+
+        startGrid(NODES - 1);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    ///
+    /// ASYNC FILTER AND LISTENER. TEST LISTENER.
+    ///
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListenerTx() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListenerTxJCacheApi() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListenerTxOffHeap() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListenerTxOffHeapJCacheApi() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_TIERED), true, true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListenerTxOffHeapValues() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_VALUES), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListenerAtomic() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListenerAtomicJCacheApi() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED), true, true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListenerReplicatedAtomic() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListenerReplicatedAtomicJCacheApi() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED), true, true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListenerReplicatedAtomicOffHeapValues() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListenerAtomicOffHeap() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListenerAtomicOffHeapValues() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListenerAtomicWithoutBackup() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListenerAtomicWithoutBackupJCacheApi() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED), true, true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListener() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListenerReplicated() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListenerReplicatedJCacheApi() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, true);
+    }
+
+    ///
+    /// ASYNC FILTER AND LISTENER. TEST FILTER.
+    ///
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterTx() throws Exception {
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterTxJCacheApi() throws Exception {
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterTxOffHeap() throws Exception {
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterTxOffHeapJCacheApi() throws Exception {
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_TIERED), true, true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterTxOffHeapValues() throws Exception {
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_VALUES), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterAtomic() throws Exception {
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterAtomicJCacheApi() throws Exception {
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED), true, true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterReplicatedAtomic() throws Exception {
+        testNonDeadLockInFilter(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterAtomicOffHeap() throws Exception {
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterAtomicOffHeapJCacheApi() throws Exception {
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), true, true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterAtomicOffHeapValues() throws Exception {
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterAtomicWithoutBackup() throws Exception {
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilter() throws Exception {
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterReplicated() throws Exception {
+        testNonDeadLockInFilter(cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterReplicatedJCacheApi() throws Exception {
+        testNonDeadLockInFilter(cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, false);
+    }
+
+    ///
+    /// ASYNC LISTENER. TEST LISTENER.
+    ///
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterTxSyncFilter() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), false, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterTxOffHeapSyncFilter() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_TIERED), false, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterTxOffHeapValuesSyncFilter() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_VALUES), false, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterAtomicSyncFilter() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED), false, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterReplicatedAtomicSyncFilter() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED), false, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterAtomicOffHeapSyncFilter() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), false, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterAtomicOffHeapValuesSyncFilter() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), false, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterAtomicWithoutBackupSyncFilter() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED), false, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterSyncFilter() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), false, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterReplicatedSyncFilter() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, ONHEAP_TIERED), false, true, false);
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @param asyncFltr Async filter.
+     * @param asyncLsnr Async listener.
+     * @param jcacheApi Use JCache api for registration entry update listener.
+     * @throws Exception If failed.
+     */
+    private void testNonDeadLockInListener(CacheConfiguration ccfg,
+        final boolean asyncFltr,
+        boolean asyncLsnr,
+        boolean jcacheApi) throws Exception {
+        ignite(0).createCache(ccfg);
+
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        try {
+            for (int i = 0; i < ITERATION_CNT; i++) {
+                log.info("Start iteration: " + i);
+
+                int nodeIdx = i % NODES;
+
+                final IgniteCache cache = grid(nodeIdx).cache(ccfg.getName());
+
+                final QueryTestKey key = NODES - 1 != nodeIdx ? affinityKey(cache) : new QueryTestKey(1);
+
+                final QueryTestValue val0 = new QueryTestValue(1);
+                final QueryTestValue newVal = new QueryTestValue(2);
+
+                final CountDownLatch latch = new CountDownLatch(1);
+                final CountDownLatch evtFromLsnrLatch = new CountDownLatch(1);
+
+                IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> fltrClsr =
+                    new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() {
+                        @Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey,
+                            ? extends QueryTestValue> e) {
+                            if (asyncFltr) {
+                                assertFalse("Failed: " + Thread.currentThread().getName(),
+                                    Thread.currentThread().getName().contains("sys-"));
+
+                                assertTrue("Failed: " + Thread.currentThread().getName(),
+                                    Thread.currentThread().getName().contains("callback-"));
+                            }
+                        }
+                    };
+
+                IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> lsnrClsr =
+                    new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() {
+                        @Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey,
+                            ? extends QueryTestValue> e) {
+                            IgniteCache<Object, Object> cache0 = ignite.cache(cache.getName());
+
+                            QueryTestValue val = e.getValue();
+
+                            if (val == null)
+                                return;
+                            else if (val.equals(newVal)) {
+                                evtFromLsnrLatch.countDown();
+
+                                return;
+                            }
+                            else if (!val.equals(val0))
+                                return;
+
+                            Transaction tx = null;
+
+                            try {
+                                if (cache0.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL)
+                                    tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ);
+
+                                assertEquals(val, val0);
+
+                                cache0.put(key, newVal);
+
+                                if (tx != null)
+                                    tx.commit();
+
+                                latch.countDown();
+                            }
+                            catch (Exception exp) {
+                                log.error("Failed: ", exp);
+
+                                throw new IgniteException(exp);
+                            }
+                            finally {
+                                if (tx != null)
+                                    tx.close();
+                            }
+                        }
+                    };
+
+                QueryCursor qry = null;
+                MutableCacheEntryListenerConfiguration<QueryTestKey, QueryTestValue> lsnrCfg = null;
+
+                CacheInvokeListener locLsnr = asyncLsnr ? new CacheInvokeListenerAsync(lsnrClsr)
+                    : new CacheInvokeListener(lsnrClsr);
+
+                CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue> rmtFltr = asyncFltr ?
+                    new CacheTestRemoteFilterAsync(fltrClsr) : new CacheTestRemoteFilter(fltrClsr);
+
+                if (jcacheApi) {
+                    lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+                        FactoryBuilder.factoryOf(locLsnr),
+                        FactoryBuilder.factoryOf(rmtFltr),
+                        true,
+                        false
+                    );
+
+                    cache.registerCacheEntryListener(lsnrCfg);
+                }
+                else {
+                    ContinuousQuery<QueryTestKey, QueryTestValue> conQry = new ContinuousQuery<>();
+
+                    conQry.setLocalListener(locLsnr);
+
+                    conQry.setRemoteFilterFactory(FactoryBuilder.factoryOf(rmtFltr));
+
+                    qry = cache.query(conQry);
+                }
+
+                try {
+                    if (rnd.nextBoolean())
+                        cache.put(key, val0);
+                    else {
+                        cache.invoke(key, new CacheEntryProcessor() {
+                            @Override public Object process(MutableEntry entry, Object... arguments)
+                                throws EntryProcessorException {
+                                entry.setValue(val0);
+
+                                return null;
+                            }
+                        });
+                    }
+
+                    assertTrue("Failed to waiting event.", U.await(latch, 3, SECONDS));
+
+                    assertEquals(cache.get(key), new QueryTestValue(2));
+
+                    assertTrue("Failed to waiting event from listener.", U.await(latch, 3, SECONDS));
+                }
+                finally {
+                    if (qry != null)
+                        qry.close();
+
+                    if (lsnrCfg != null)
+                        cache.deregisterCacheEntryListener(lsnrCfg);
+                }
+
+                log.info("Iteration finished: " + i);
+            }
+        }
+        finally {
+            ignite(0).destroyCache(ccfg.getName());
+        }
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @param asyncFilter Async filter.
+     * @param asyncLsnr Async listener.
+     * @param jcacheApi Use JCache api for start update listener.
+     * @throws Exception If failed.
+     */
+    private void testNonDeadLockInFilter(CacheConfiguration ccfg,
+        final boolean asyncFilter,
+        final boolean asyncLsnr,
+        boolean jcacheApi) throws Exception {
+        ignite(0).createCache(ccfg);
+
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        try {
+            for (int i = 0; i < ITERATION_CNT; i++) {
+                log.info("Start iteration: " + i);
+
+                int nodeIdx = i % NODES;
+
+                final IgniteCache cache = grid(nodeIdx).cache(ccfg.getName());
+
+                final QueryTestKey key = NODES - 1 != nodeIdx ? affinityKey(cache) : new QueryTestKey(1);
+
+                final QueryTestValue val0 = new QueryTestValue(1);
+                final QueryTestValue newVal = new QueryTestValue(2);
+
+                final CountDownLatch latch = new CountDownLatch(1);
+                final CountDownLatch evtFromLsnrLatch = new CountDownLatch(1);
+
+                IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> fltrClsr =
+                    new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() {
+                        @Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey,
+                            ? extends QueryTestValue> e) {
+                            if (asyncFilter) {
+                                assertFalse("Failed: " + Thread.currentThread().getName(),
+                                    Thread.currentThread().getName().contains("sys-"));
+
+                                assertTrue("Failed: " + Thread.currentThread().getName(),
+                                    Thread.currentThread().getName().contains("callback-"));
+                            }
+
+                            IgniteCache<Object, Object> cache0 = ignite.cache(cache.getName());
+
+                            QueryTestValue val = e.getValue();
+
+                            if (val == null)
+                                return;
+                            else if (val.equals(newVal)) {
+                                evtFromLsnrLatch.countDown();
+
+                                return;
+                            }
+                            else if (!val.equals(val0))
+                                return;
+
+                            Transaction tx = null;
+
+                            try {
+                                if (cache0.getConfiguration(CacheConfiguration.class)
+                                    .getAtomicityMode() == TRANSACTIONAL)
+                                    tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ);
+
+                                assertEquals(val, val0);
+
+                                cache0.put(key, newVal);
+
+                                if (tx != null)
+                                    tx.commit();
+
+                                latch.countDown();
+                            }
+                            catch (Exception exp) {
+                                log.error("Failed: ", exp);
+
+                                throw new IgniteException(exp);
+                            }
+                            finally {
+                                if (tx != null)
+                                    tx.close();
+                            }
+                        }
+                    };
+
+                IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> lsnrClsr =
+                    new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() {
+                        @Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey,
+                            ? extends QueryTestValue> e) {
+                            if (asyncLsnr) {
+                                assertFalse("Failed: " + Thread.currentThread().getName(),
+                                    Thread.currentThread().getName().contains("sys-"));
+
+                                assertTrue("Failed: " + Thread.currentThread().getName(),
+                                    Thread.currentThread().getName().contains("callback-"));
+                            }
+
+                            QueryTestValue val = e.getValue();
+
+                            if (val == null || !val.equals(new QueryTestValue(1)))
+                                return;
+
+                            assertEquals(val, val0);
+
+                            latch.countDown();
+                        }
+                    };
+
+
+                QueryCursor qry = null;
+                MutableCacheEntryListenerConfiguration<QueryTestKey, QueryTestValue> lsnrCfg = null;
+
+                CacheInvokeListener locLsnr = asyncLsnr ? new CacheInvokeListenerAsync(lsnrClsr)
+                    : new CacheInvokeListener(lsnrClsr);
+
+                CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue> rmtFltr = asyncFilter ?
+                    new CacheTestRemoteFilterAsync(fltrClsr) : new CacheTestRemoteFilter(fltrClsr);
+
+                if (jcacheApi) {
+                    lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+                        FactoryBuilder.factoryOf(locLsnr),
+                        FactoryBuilder.factoryOf(rmtFltr),
+                        true,
+                        false
+                    );
+
+                    cache.registerCacheEntryListener(lsnrCfg);
+                }
+                else {
+                    ContinuousQuery<QueryTestKey, QueryTestValue> conQry = new ContinuousQuery<>();
+
+                    conQry.setLocalListener(locLsnr);
+
+                    conQry.setRemoteFilterFactory(FactoryBuilder.factoryOf(rmtFltr));
+
+                    qry = cache.query(conQry);
+                }
+
+                try {
+                    if (rnd.nextBoolean())
+                        cache.put(key, val0);
+                    else
+                        cache.invoke(key, new CacheEntryProcessor() {
+                            @Override public Object process(MutableEntry entry, Object... arguments)
+                                throws EntryProcessorException {
+                                entry.setValue(val0);
+
+                                return null;
+                            }
+                        });
+
+                    assert U.await(latch, 3, SECONDS) : "Failed to waiting event.";
+
+                    assertEquals(cache.get(key), new QueryTestValue(2));
+
+                    assertTrue("Failed to waiting event from filter.", U.await(latch, 3, SECONDS));
+                }
+                finally {
+                    if (qry != null)
+                        qry.close();
+
+                    if (lsnrCfg != null)
+                        cache.deregisterCacheEntryListener(lsnrCfg);
+                }
+
+                log.info("Iteration finished: " + i);
+            }
+        }
+        finally {
+            ignite(0).destroyCache(ccfg.getName());
+        }
+    }
+
+    /**
+     * @param cache Ignite cache.
+     * @return Key.
+     */
+    private QueryTestKey affinityKey(IgniteCache cache) {
+        Affinity aff = affinity(cache);
+
+        for (int i = 0; i < 10_000; i++) {
+            QueryTestKey key = new QueryTestKey(i);
+
+            if (aff.isPrimary(localNode(cache), key))
+                return key;
+        }
+
+        throw new IgniteException("Failed to found primary key.");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return TimeUnit.SECONDS.toMillis(15);
+    }
+
+    /**
+     *
+     */
+    @IgniteAsyncCallback
+    private static class CacheTestRemoteFilterAsync extends CacheTestRemoteFilter {
+        /**
+         * @param clsr Closure.
+         */
+        public CacheTestRemoteFilterAsync(
+            IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> clsr) {
+            super(clsr);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class CacheTestRemoteFilter implements
+        CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue> {
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** */
+        private IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> clsr;
+
+        /**
+         * @param clsr Closure.
+         */
+        public CacheTestRemoteFilter(IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey,
+            ? extends QueryTestValue>> clsr) {
+            this.clsr = clsr;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e)
+            throws CacheEntryListenerException {
+            clsr.apply(ignite, e);
+
+            return true;
+        }
+    }
+
+    /**
+     *
+     */
+    @IgniteAsyncCallback
+    private static class CacheInvokeListenerAsync extends CacheInvokeListener {
+        /**
+         * @param clsr Closure.
+         */
+        public CacheInvokeListenerAsync(
+            IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> clsr) {
+            super(clsr);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class CacheInvokeListener implements CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>,
+        CacheEntryCreatedListener<QueryTestKey, QueryTestValue>, Serializable {
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** */
+        private IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> clsr;
+
+        /**
+         * @param clsr Closure.
+         */
+        public CacheInvokeListener(IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey,
+            ? extends QueryTestValue>> clsr) {
+            this.clsr = clsr;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey,
+            ? extends QueryTestValue>> events)
+            throws CacheEntryListenerException {
+            for (CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e : events)
+                clsr.apply(ignite, e);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onCreated(Iterable<CacheEntryEvent<? extends QueryTestKey,
+            ? extends QueryTestValue>> events) throws CacheEntryListenerException {
+            for (CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e : events)
+                clsr.apply(ignite, e);
+        }
+    }
+
+    /**
+     * @param cacheMode Cache mode.
+     * @param backups Number of backups.
+     * @param atomicityMode Cache atomicity mode.
+     * @param memoryMode Cache memory mode.
+     * @return Cache configuration.
+     */
+    protected CacheConfiguration<Object, Object> cacheConfiguration(
+        CacheMode cacheMode,
+        int backups,
+        CacheAtomicityMode atomicityMode,
+        CacheMemoryMode memoryMode) {
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setName("test-cache-" + atomicityMode + "-" + cacheMode + "-" + memoryMode + "-" + backups);
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setCacheMode(cacheMode);
+        ccfg.setMemoryMode(memoryMode);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicWriteOrderMode(PRIMARY);
+
+        if (cacheMode == PARTITIONED)
+            ccfg.setBackups(backups);
+
+        return ccfg;
+    }
+
+    /**
+     *
+     */
+    public static class QueryTestKey implements Serializable, Comparable {
+        /** */
+        private final Integer key;
+
+        /**
+         * @param key Key.
+         */
+        public QueryTestKey(Integer key) {
+            this.key = key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            QueryTestKey that = (QueryTestKey)o;
+
+            return key.equals(that.key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return key.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(QueryTestKey.class, this);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int compareTo(Object o) {
+            return key - ((QueryTestKey)o).key;
+        }
+    }
+
+    /**
+     *
+     */
+    public static class QueryTestValue implements Serializable {
+        /** */
+        @GridToStringInclude
+        protected final Integer val1;
+
+        /** */
+        @GridToStringInclude
+        protected final String val2;
+
+        /**
+         * @param val Value.
+         */
+        public QueryTestValue(Integer val) {
+            this.val1 = val;
+            this.val2 = String.valueOf(val);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            QueryTestValue that = (QueryTestValue)o;
+
+            return val1.equals(that.val1) && val2.equals(that.val2);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = val1.hashCode();
+
+            res = 31 * res + val2.hashCode();
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(QueryTestValue.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java
new file mode 100644
index 0000000..928cfda
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import javax.cache.configuration.Factory;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.lang.IgniteAsyncCallback;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ *
+ */
+public class CacheContinuousQueryFactoryAsyncFilterRandomOperationTest
+    extends CacheContinuousQueryFactoryFilterRandomOperationTest {
+    /** {@inheritDoc} */
+    @NotNull @Override protected Factory<? extends CacheEntryEventFilter<QueryTestKey, QueryTestValue>>
+        createFilterFactory() {
+        return new AsyncFilterFactory();
+    }
+
+    /**
+     *
+     */
+    @IgniteAsyncCallback
+    protected static class NonSerializableAsyncFilter implements
+        CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue>, Externalizable {
+        /** */
+        public NonSerializableAsyncFilter() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> evt) {
+            assertTrue("Failed. Current thread name: " + Thread.currentThread().getName(),
+                Thread.currentThread().getName().contains("callback-"));
+
+            assertFalse("Failed. Current thread name: " + Thread.currentThread().getName(),
+                Thread.currentThread().getName().contains("sys-"));
+
+            return isAccepted(evt.getValue());
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            fail("Entry filter should not be marshaled.");
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            fail("Entry filter should not be marshaled.");
+        }
+
+        /**
+         * @param val Value.
+         * @return {@code True} if value is even.
+         */
+        public static boolean isAccepted(QueryTestValue val) {
+            return val == null || val.val1 % 2 == 0;
+        }
+    }
+
+    /**
+     *
+     */
+    protected static class AsyncFilterFactory implements Factory<NonSerializableAsyncFilter> {
+        /** {@inheritDoc} */
+        @Override public NonSerializableAsyncFilter create() {
+            return new NonSerializableAsyncFilter();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Factory<? extends CacheEntryEventFilter<QueryTestKey, QueryTestValue>> noOpFilterFactory() {
+        return FactoryBuilder.factoryOf(NoopAsyncFilter.class);
+    }
+
+    /**
+     *
+     */
+    @IgniteAsyncCallback
+    protected static class NoopAsyncFilter implements
+        CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue>, Externalizable {
+        /** */
+        public NoopAsyncFilter() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> evt) {
+            assertTrue("Failed. Current thread name: " + Thread.currentThread().getName(),
+                Thread.currentThread().getName().contains("callback-"));
+
+            assertFalse("Failed. Current thread name: " + Thread.currentThread().getName(),
+                Thread.currentThread().getName().contains("sys-"));
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            // No-op.
+        }
+    }
+}


Mime
View raw message