ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [6/7] ignite git commit: IGNITE-2004 Fixed "Asynchronous execution of ContinuousQuery's remote filter & local list".
Date Sat, 23 Apr 2016 08:47:07 GMT
IGNITE-2004 Fixed "Asynchronous execution of ContinuousQuery's remote filter & local list".


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2ff64c2a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2ff64c2a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2ff64c2a

Branch: refs/heads/ignite-2523-1
Commit: 2ff64c2aa8f142f2918200e5ee85e6b7e83afbfa
Parents: a6de16d
Author: nikolay_tikhonov <ntikhonov@gridgain.com>
Authored: Fri Apr 22 18:41:58 2016 +0300
Committer: nikolay_tikhonov <ntikhonov@gridgain.com>
Committed: Fri Apr 22 18:41:58 2016 +0300

----------------------------------------------------------------------
 .../CacheContinuousAsyncQueryExample.java       | 138 +++
 .../datagrid/CacheContinuousQueryExample.java   |  13 +-
 .../ignite/cache/query/ContinuousQuery.java     |  27 +
 .../configuration/IgniteConfiguration.java      |  32 +
 .../ignite/internal/GridKernalContext.java      |   8 +
 .../ignite/internal/GridKernalContextImpl.java  |  12 +
 .../apache/ignite/internal/IgniteKernal.java    |   3 +
 .../org/apache/ignite/internal/IgnitionEx.java  |  16 +-
 .../processors/cache/GridCacheEntryEx.java      |   5 +-
 .../processors/cache/GridCacheMapEntry.java     |  49 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  98 +-
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   | 121 +--
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  |  33 +-
 .../distributed/near/GridNearAtomicCache.java   |   3 +-
 .../continuous/CacheContinuousQueryEvent.java   |   7 +
 .../continuous/CacheContinuousQueryHandler.java | 446 +++++++--
 .../CacheContinuousQueryListener.java           |   6 +-
 .../continuous/CacheContinuousQueryManager.java |  62 +-
 .../apache/ignite/lang/IgniteAsyncCallback.java | 111 +++
 .../thread/IgniteStripedThreadPoolExecutor.java | 164 +--
 .../processors/cache/GridCacheTestEntryEx.java  |   4 +-
 ...FailoverAtomicPrimaryWriteOrderSelfTest.java |  50 +
 ...sQueryAsyncFailoverTxReplicatedSelfTest.java |  37 +
 ...eContinuousQueryAsyncFailoverTxSelfTest.java |  44 +
 ...eContinuousQueryAsyncFilterListenerTest.java | 986 +++++++++++++++++++
 ...ryFactoryAsyncFilterRandomOperationTest.java | 131 +++
 ...usQueryFactoryFilterRandomOperationTest.java | 725 ++++++++++++++
 .../CacheContinuousQueryFactoryFilterTest.java  | 714 --------------
 ...ContinuousQueryFailoverAbstractSelfTest.java |  63 +-
 .../CacheContinuousQueryLostPartitionTest.java  |  14 +
 ...ontinuousQueryOperationFromCallbackTest.java | 627 ++++++++++++
 .../CacheContinuousQueryOrderingEventTest.java  | 722 ++++++++++++++
 ...acheContinuousQueryRandomOperationsTest.java |  23 +
 .../junits/GridTestKernalContext.java           |   1 +
 .../IgniteBinaryCacheQueryTestSuite.java        |   1 -
 .../IgniteCacheQuerySelfTestSuite3.java         |  14 +-
 .../IgniteCacheQuerySelfTestSuite4.java         |   7 +
 .../cache/CacheEntryEventAsyncProbe.java        |  61 ++
 .../yardstick/cache/CacheEntryEventProbe.java   |  33 +-
 39 files changed, 4408 insertions(+), 1203 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousAsyncQueryExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousAsyncQueryExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousAsyncQueryExample.java
new file mode 100644
index 0000000..4ac7ecb
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousAsyncQueryExample.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.datagrid;
+
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.examples.ExampleNodeStartup;
+import org.apache.ignite.lang.IgniteAsyncCallback;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.resources.IgniteInstanceResource;
+
+/**
+ * This examples demonstrates asynchronous continuous query API.
+ * <p>
+ * Remote nodes should always be started with special configuration file which
+ * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}.
+ * <p>
+ * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will
+ * start node with {@code examples/config/example-ignite.xml} configuration.
+ */
+public class CacheContinuousAsyncQueryExample {
+    /** Cache name. */
+    private static final String CACHE_NAME = CacheContinuousAsyncQueryExample.class.getSimpleName();
+
+    /**
+     * Executes example.
+     *
+     * @param args Command line arguments, none required.
+     * @throws Exception If example execution failed.
+     */
+    public static void main(String[] args) throws Exception {
+        try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
+            System.out.println();
+            System.out.println(">>> Cache continuous query example started.");
+
+            // Auto-close cache at the end of the example.
+            try (IgniteCache<Integer, String> cache = ignite.getOrCreateCache(CACHE_NAME)) {
+                int keyCnt = 20;
+
+                // These entries will be queried by initial predicate.
+                for (int i = 0; i < keyCnt; i++)
+                    cache.put(i, Integer.toString(i));
+
+                // Create new continuous query.
+                ContinuousQuery<Integer, String> qry = new ContinuousQuery<>();
+
+                qry.setInitialQuery(new ScanQuery<>(new IgniteBiPredicate<Integer, String>() {
+                    @Override public boolean apply(Integer key, String val) {
+                        return key > 10;
+                    }
+                }));
+
+                // Callback that is called locally when update notifications are received.
+                qry.setLocalListener(new CacheEntryUpdatedListener<Integer, String>() {
+                    @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> evts) {
+                        for (CacheEntryEvent<? extends Integer, ? extends String> e : evts)
+                            System.out.println("Updated entry [key=" + e.getKey() + ", val=" + e.getValue() + ']');
+                    }
+                });
+
+                // This filter will be evaluated remotely on all nodes.
+                // Entry that pass this filter will be sent to the caller.
+                qry.setRemoteFilterFactory(new Factory<CacheEntryEventFilter<Integer, String>>() {
+                    @Override public CacheEntryEventFilter<Integer, String> create() {
+                        return new CacheEntryFilter();
+                    }
+                });
+
+                // Execute query.
+                try (QueryCursor<Cache.Entry<Integer, String>> cur = cache.query(qry)) {
+                    // Iterate through existing data.
+                    for (Cache.Entry<Integer, String> e : cur)
+                        System.out.println("Queried existing entry [key=" + e.getKey() + ", val=" + e.getValue() + ']');
+
+                    // Add a few more keys and watch more query notifications.
+                    for (int i = 0; i < keyCnt; i++)
+                        cache.put(i, Integer.toString(i));
+
+                    // Wait for a while while callback is notified about remaining puts.
+                    Thread.sleep(2000);
+                }
+
+                // Iterate through entries which was updated from filter.
+                for (int i = 0; i < 10; i++)
+                    System.out.println("Entry updated from filter [key=" + i + ", val=" + cache.get(i) + ']');
+            }
+            finally {
+                // Distributed cache could be removed from cluster only by #destroyCache() call.
+                ignite.destroyCache(CACHE_NAME);
+            }
+        }
+    }
+
+    /**
+     * Filter returns {@code true} for entries which have key bigger than 10.
+     */
+    @IgniteAsyncCallback
+    private static class CacheEntryFilter implements CacheEntryEventFilter<Integer, String> {
+        /** Ignite instance. */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends String> e)
+            throws CacheEntryListenerException {
+            // This cache operation is safe because filter has Ignite AsyncCallback annotation.
+            if (e.getKey() < 10 && String.valueOf(e.getKey()).equals(e.getValue()))
+                ignite.cache(CACHE_NAME).put(e.getKey(), e.getValue() + "_less_than_10");
+
+            return e.getKey() > 10;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java
index 59759af..aad5b5d 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java
@@ -18,12 +18,13 @@
 package org.apache.ignite.examples.datagrid;
 
 import javax.cache.Cache;
+import javax.cache.configuration.Factory;
 import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
 import javax.cache.event.CacheEntryUpdatedListener;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.Ignition;
-import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.ScanQuery;
@@ -81,9 +82,13 @@ public class CacheContinuousQueryExample {
 
                 // This filter will be evaluated remotely on all nodes.
                 // Entry that pass this filter will be sent to the caller.
-                qry.setRemoteFilter(new CacheEntryEventSerializableFilter<Integer, String>() {
-                    @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends String> e) {
-                        return e.getKey() > 10;
+                qry.setRemoteFilterFactory(new Factory<CacheEntryEventFilter<Integer, String>>() {
+                    @Override public CacheEntryEventFilter<Integer, String> create() {
+                        return new CacheEntryEventFilter<Integer, String>() {
+                            @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends String> e) {
+                                return e.getKey() > 10;
+                            }
+                        };
                     }
                 });
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
index 3ea8f93..bbfe8cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
@@ -23,6 +23,8 @@ import javax.cache.event.CacheEntryEventFilter;
 import javax.cache.event.CacheEntryUpdatedListener;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.lang.IgniteAsyncCallback;
 
 /**
  * API for configuring continuous cache queries.
@@ -92,6 +94,16 @@ import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
  * Note that this works even if you didn't provide initial query. Cursor will
  * be empty in this case, but it will still unregister listeners when {@link QueryCursor#close()}
  * is called.
+ * <p>
+ * {@link IgniteAsyncCallback} annotation is supported for {@link CacheEntryEventFilter}
+ * (see {@link #setRemoteFilterFactory(Factory)}) and {@link CacheEntryUpdatedListener}
+ * (see {@link #setLocalListener(CacheEntryUpdatedListener)}).
+ * If filter and/or listener are annotated with {@link IgniteAsyncCallback} then annotated callback
+ * is executed in async callback pool (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()})
+ * and notification order is kept the same as update order for given cache key.
+ *
+ * @see IgniteAsyncCallback
+ * @see IgniteConfiguration#getAsyncCallbackPoolSize()
  */
 public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
     /** */
@@ -173,9 +185,14 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
      * <b>WARNING:</b> all operations that involve any kind of JVM-local or distributed locking (e.g.,
      * synchronization or transactional cache operations), should be executed asynchronously without
      * blocking the thread that called the callback. Otherwise, you can get deadlocks.
+     * <p>
+     * If local listener are annotated with {@link IgniteAsyncCallback} then it is executed in async callback pool
+     * (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()}) that allow to perform a cache operations.
      *
      * @param locLsnr Local callback.
      * @return {@code this} for chaining.
+     * @see IgniteAsyncCallback
+     * @see IgniteConfiguration#getAsyncCallbackPoolSize()
      */
     public ContinuousQuery<K, V> setLocalListener(CacheEntryUpdatedListener<K, V> locLsnr) {
         this.locLsnr = locLsnr;
@@ -198,11 +215,16 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
      * <b>WARNING:</b> all operations that involve any kind of JVM-local or distributed locking
      * (e.g., synchronization or transactional cache operations), should be executed asynchronously
      * without blocking the thread that called the filter. Otherwise, you can get deadlocks.
+     * <p>
+     * If remote filter are annotated with {@link IgniteAsyncCallback} then it is executed in async callback
+     * pool (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()}) that allow to perform a cache operations.
      *
      * @param rmtFilter Key-value filter.
      * @return {@code this} for chaining.
      *
      * @deprecated Use {@link #setRemoteFilterFactory(Factory)} instead.
+     * @see IgniteAsyncCallback
+     * @see IgniteConfiguration#getAsyncCallbackPoolSize()
      */
     @Deprecated
     public ContinuousQuery<K, V> setRemoteFilter(CacheEntryEventSerializableFilter<K, V> rmtFilter) {
@@ -227,9 +249,14 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
      * <b>WARNING:</b> all operations that involve any kind of JVM-local or distributed locking
      * (e.g., synchronization or transactional cache operations), should be executed asynchronously
      * without blocking the thread that called the filter. Otherwise, you can get deadlocks.
+     * <p>
+     * If remote filter are annotated with {@link IgniteAsyncCallback} then it is executed in async callback
+     * pool (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()}) that allow to perform a cache operations.
      *
      * @param rmtFilterFactory Key-value filter factory.
      * @return {@code this} for chaining.
+     * @see IgniteAsyncCallback
+     * @see IgniteConfiguration#getAsyncCallbackPoolSize()
      */
     public ContinuousQuery<K, V> setRemoteFilterFactory(
         Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index e5bd05c..19b9a4d 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -43,6 +43,7 @@ import org.apache.ignite.events.Event;
 import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteAsyncCallback;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lifecycle.LifecycleBean;
@@ -223,6 +224,9 @@ public class IgniteConfiguration {
     /** Public pool size. */
     private int pubPoolSize = DFLT_PUBLIC_THREAD_CNT;
 
+    /** Async Callback pool size. */
+    private int callbackPoolSize = DFLT_PUBLIC_THREAD_CNT;
+
     /** System pool size. */
     private int sysPoolSize = DFLT_SYSTEM_CORE_THREAD_CNT;
 
@@ -723,6 +727,20 @@ public class IgniteConfiguration {
     }
 
     /**
+     * Size of thread pool that is in charge of processing asynchronous callbacks.
+     * <p>
+     * This pool is used for callbacks annotated with {@link IgniteAsyncCallback}.
+     * <p>
+     * If not provided, executor service will have size {@link #DFLT_PUBLIC_THREAD_CNT}.
+     *
+     * @return Thread pool size to be used.
+     * @see IgniteAsyncCallback
+     */
+    public int getAsyncCallbackPoolSize() {
+        return callbackPoolSize;
+    }
+
+    /**
      * Size of thread pool that is in charge of processing internal and Visor
      * {@link ComputeJob GridJobs}.
      * <p>
@@ -831,6 +849,20 @@ public class IgniteConfiguration {
     }
 
     /**
+     * Sets async callback thread pool size to use within grid.
+     *
+     * @param poolSize Thread pool size to use within grid.
+     * @return {@code this} for chaining.
+     * @see IgniteConfiguration#getAsyncCallbackPoolSize()
+     * @see IgniteAsyncCallback
+     */
+    public IgniteConfiguration setAsyncCallbackPoolSize(int poolSize) {
+        this.callbackPoolSize = poolSize;
+
+        return this;
+    }
+
+    /**
      * Sets management thread pool size to use within grid.
      *
      * @param poolSize Thread pool size to use within grid.

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index b95d595..3eaef1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -67,6 +67,7 @@ import org.apache.ignite.internal.util.IgniteExceptionRegistry;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.plugin.PluginNotFoundException;
 import org.apache.ignite.plugin.PluginProvider;
+import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
 
 /**
  *
@@ -298,6 +299,13 @@ public interface GridKernalContext extends Iterable<GridComponent> {
     public ExecutorService marshallerCachePool();
 
     /**
+     * Gets async callback pool.
+     *
+     * @return Async callback pool.
+     */
+    public IgniteStripedThreadPoolExecutor asyncCallbackPool();
+
+    /**
      * Gets cache object processor.
      *
      * @return Cache object processor.

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 753dbe8..79d67df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -89,6 +89,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.plugin.PluginNotFoundException;
 import org.apache.ignite.plugin.PluginProvider;
+import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_DAEMON;
@@ -305,6 +306,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
 
     /** */
     @GridToStringExclude
+    protected IgniteStripedThreadPoolExecutor callbackExecSvc;
+
+    /** */
+    @GridToStringExclude
     private Map<String, Object> attrs = new HashMap<>();
 
     /** */
@@ -379,6 +384,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
         ExecutorService mgmtExecSvc,
         ExecutorService igfsExecSvc,
         ExecutorService restExecSvc,
+        IgniteStripedThreadPoolExecutor callbackExecSvc,
         List<PluginProvider> plugins) throws IgniteCheckedException {
         assert grid != null;
         assert cfg != null;
@@ -395,6 +401,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
         this.mgmtExecSvc = mgmtExecSvc;
         this.igfsExecSvc = igfsExecSvc;
         this.restExecSvc = restExecSvc;
+        this.callbackExecSvc = callbackExecSvc;
 
         marshCtx = new MarshallerContextImpl(plugins);
 
@@ -746,6 +753,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteStripedThreadPoolExecutor asyncCallbackPool() {
+        return callbackExecSvc;
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteCacheObjectProcessor cacheObjects() {
         return cacheObjProc;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 20795fc..d6655d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -167,6 +167,7 @@ import org.apache.ignite.plugin.PluginProvider;
 import org.apache.ignite.spi.IgniteSpi;
 import org.apache.ignite.spi.IgniteSpiVersionCheckException;
 import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
+import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_CONFIG_URL;
@@ -667,6 +668,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         ExecutorService mgmtExecSvc,
         ExecutorService igfsExecSvc,
         ExecutorService restExecSvc,
+        IgniteStripedThreadPoolExecutor callbackExecSvc,
         GridAbsClosure errHnd)
         throws IgniteCheckedException
     {
@@ -771,6 +773,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
                 mgmtExecSvc,
                 igfsExecSvc,
                 restExecSvc,
+                callbackExecSvc,
                 plugins);
 
             cfg.getMarshaller().setContext(ctx.marshallerContext());

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 9a83826..f47b7ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -95,6 +95,7 @@ import org.apache.ignite.spi.indexing.noop.NoopIndexingSpi;
 import org.apache.ignite.spi.loadbalancing.roundrobin.RoundRobinLoadBalancingSpi;
 import org.apache.ignite.spi.swapspace.file.FileSwapSpaceSpi;
 import org.apache.ignite.spi.swapspace.noop.NoopSwapSpaceSpi;
+import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
 import org.apache.ignite.thread.IgniteThread;
 import org.apache.ignite.thread.IgniteThreadPoolExecutor;
 import org.jetbrains.annotations.Nullable;
@@ -1473,6 +1474,9 @@ public class IgnitionEx {
         /** Marshaller cache executor service. */
         private ExecutorService marshCacheExecSvc;
 
+        /** Continuous query executor service. */
+        private IgniteStripedThreadPoolExecutor callbackExecSvc;
+
         /** Grid state. */
         private volatile IgniteState state = STOPPED;
 
@@ -1685,6 +1689,12 @@ public class IgnitionEx {
                 0,
                 new LinkedBlockingQueue<Runnable>());
 
+            // Note that we do not pre-start threads here as this pool may not be needed.
+            callbackExecSvc = new IgniteStripedThreadPoolExecutor(
+                cfg.getAsyncCallbackPoolSize(),
+                cfg.getGridName(),
+                "callback");
+
             if (myCfg.getConnectorConfiguration() != null) {
                 restExecSvc = new IgniteThreadPoolExecutor(
                     "rest",
@@ -1724,7 +1734,7 @@ public class IgnitionEx {
                 grid = grid0;
 
                 grid0.start(myCfg, utilityCacheExecSvc, marshCacheExecSvc, execSvc, sysExecSvc, p2pExecSvc, mgmtExecSvc,
-                    igfsExecSvc, restExecSvc,
+                    igfsExecSvc, restExecSvc, callbackExecSvc,
                     new CA() {
                         @Override public void apply() {
                             startLatch.countDown();
@@ -2327,6 +2337,10 @@ public class IgnitionEx {
             U.shutdownNow(getClass(), marshCacheExecSvc, log);
 
             marshCacheExecSvc = null;
+
+            U.shutdownNow(getClass(), callbackExecSvc, log);
+
+            callbackExecSvc = null;
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index d6d7335..e679dfd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -27,6 +27,7 @@ import org.apache.ignite.cache.eviction.EvictableEntry;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -462,6 +463,7 @@ public interface GridCacheEntryEx {
      * @param subjId Subject ID initiated this update.
      * @param taskName Task name.
      * @param updateCntr Update counter.
+     * @param fut Dht atomic future.
      * @return Tuple where first value is flag showing whether operation succeeded,
      *      second value is old entry value if return value is requested, third is updated entry value,
      *      fourth is the version to enqueue for deferred delete the fifth is DR conflict context
@@ -497,7 +499,8 @@ public interface GridCacheEntryEx {
         @Nullable UUID subjId,
         String taskName,
         @Nullable CacheObject prevVal,
-        @Nullable Long updateCntr
+        @Nullable Long updateCntr,
+        @Nullable GridDhtAtomicUpdateFuture fut
     ) throws IgniteCheckedException, GridCacheEntryRemovedException;
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 735e20a..75d96d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
 import org.apache.ignite.internal.processors.cache.extras.GridCacheEntryExtras;
 import org.apache.ignite.internal.processors.cache.extras.GridCacheMvccEntryExtras;
@@ -1248,6 +1249,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     tx.local(),
                     false,
                     updateCntr0,
+                    null,
                     topVer);
             }
 
@@ -1445,6 +1447,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     tx.local(),
                     false,
                     updateCntr0,
+                    null,
                     topVer);
             }
 
@@ -1821,6 +1824,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     true,
                     false,
                     updateCntr,
+                    null,
                     AffinityTopologyVersion.NONE);
             }
 
@@ -1870,7 +1874,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         @Nullable final UUID subjId,
         final String taskName,
         @Nullable final CacheObject prevVal,
-        @Nullable final Long updateCntr
+        @Nullable final Long updateCntr,
+        @Nullable GridDhtAtomicUpdateFuture fut
     ) throws IgniteCheckedException, GridCacheEntryRemovedException, GridClosureException {
         assert cctx.atomic();
 
@@ -1899,7 +1904,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         Long updateCntr0 = null;
 
         synchronized (this) {
-            boolean needVal = intercept || retval || op == GridCacheOperation.TRANSFORM || !F.isEmptyOrNulls(filter);
+            boolean internal = isInternal() || !context().userCache();
+
+            Map<UUID, CacheContinuousQueryListener> lsnrs = cctx.continuousQueries().updateListeners(internal, false);
+
+            boolean needVal = lsnrs != null || intercept || retval || op == GridCacheOperation.TRANSFORM
+                || !F.isEmptyOrNulls(filter);
 
             checkObsolete();
 
@@ -2093,6 +2103,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                                 primary,
                                 false,
                                 updateCntr0,
+                                null,
                                 topVer);
                         }
 
@@ -2501,13 +2512,42 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             if (res)
                 updateMetrics(op, metrics);
 
+            // Continuous query filter should be perform under lock.
+            if (lsnrs != null) {
+                CacheObject evtVal = updated;
+                CacheObject evtOldVal = oldVal;
+
+                if (isOffHeapValuesOnly()) {
+                    evtVal = cctx.toCacheObject(cctx.unwrapTemporary(evtVal));
+
+                    evtOldVal = cctx.toCacheObject(cctx.unwrapTemporary(evtOldVal));
+                }
+
+                cctx.continuousQueries().onEntryUpdated(lsnrs, key, evtVal, evtOldVal, internal,
+                    partition(), primary, false, updateCntr0, fut, topVer);
+            }
+
             cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE, keepBinary);
 
             if (intercept) {
                 if (op == GridCacheOperation.UPDATE)
-                    cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, updated, updated0, keepBinary, updateCntr0));
+                    cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(
+                        cctx,
+                        key,
+                        key0,
+                        updated,
+                        updated0,
+                        keepBinary,
+                        updateCntr0));
                 else
-                    cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(cctx, key, key0, oldVal, old0, keepBinary, updateCntr0));
+                    cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(
+                        cctx,
+                        key,
+                        key0,
+                        oldVal,
+                        old0,
+                        keepBinary,
+                        updateCntr0));
 
                 if (interceptRes != null)
                     oldVal = cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2()));
@@ -3302,6 +3342,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                         true,
                         preload,
                         updateCntr,
+                        null,
                         topVer);
 
                     cctx.dataStructures().onEntryUpdated(key, false, true);

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 013184b..d28aaaa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -76,7 +76,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSing
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
-import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
@@ -2141,10 +2140,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         boolean intercept = ctx.config().getInterceptor() != null;
 
-        boolean initLsnrs = false;
-        Map<UUID, CacheContinuousQueryListener> lsnrs = null;
-        boolean internal = false;
-
         // Avoid iterator creation.
         for (int i = 0; i < keys.size(); i++) {
             KeyCacheObject k = keys.get(i);
@@ -2159,14 +2154,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 if (entry == null)
                     continue;
 
-                if (!initLsnrs) {
-                    internal = entry.isInternal() || !context().userCache();
-
-                    lsnrs = ctx.continuousQueries().updateListeners(internal, false);
-
-                    initLsnrs = true;
-                }
-
                 GridCacheVersion newConflictVer = req.conflictVersion(i);
                 long newConflictTtl = req.conflictTtl(i);
                 long newConflictExpireTime = req.conflictExpireTime(i);
@@ -2195,7 +2182,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     req.invokeArguments(),
                     primary && writeThrough() && !req.skipStore(),
                     !req.skipStore(),
-                    lsnrs != null || sndPrevVal || req.returnValue(),
+                    sndPrevVal || req.returnValue(),
                     req.keepBinary(),
                     expiry,
                     true,
@@ -2213,7 +2200,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     req.subjectId(),
                     taskName,
                     null,
-                    null);
+                    null,
+                    dhtFut);
 
                 if (dhtFut == null && !F.isEmpty(filteredReaders)) {
                     dhtFut = createDhtFuture(ver, req, res, completionCb, true);
@@ -2222,8 +2210,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 }
 
                 if (dhtFut != null) {
-                    dhtFut.listeners(lsnrs);
-
                     if (updRes.sendToDht()) { // Send to backups even in case of remove-remove scenarios.
                         GridCacheVersionConflictContext<?, ?> conflictCtx = updRes.conflictResolveResult();
 
@@ -2260,19 +2246,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 "[entry=" + entry + ", filter=" + Arrays.toString(req.filter()) + ']');
                     }
                 }
-                else if (lsnrs != null && updRes.updateCounter() != 0) {
-                    ctx.continuousQueries().onEntryUpdated(
-                        lsnrs,
-                        entry.key(),
-                        updRes.newValue(),
-                        updRes.oldValue(),
-                        internal,
-                        entry.partition(),
-                        primary,
-                        false,
-                        updRes.updateCounter(),
-                        topVer);
-                }
 
                 if (hasNear) {
                     if (primary && updRes.sendToDht()) {
@@ -2446,9 +2419,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
             boolean intercept = ctx.config().getInterceptor() != null;
 
-            boolean initLsnrs = false;
-            Map<UUID, CacheContinuousQueryListener> lsnrs = null;
-
             // Avoid iterator creation.
             for (int i = 0; i < entries.size(); i++) {
                 GridDhtCacheEntry entry = entries.get(i);
@@ -2482,14 +2452,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id()));
                     }
 
-                    if (!initLsnrs) {
-                        lsnrs = ctx.continuousQueries().updateListeners(
-                            entry.isInternal() || !context().userCache(),
-                            false);
-
-                        initLsnrs = true;
-                    }
-
                     GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
                         ver,
                         node.id(),
@@ -2499,7 +2461,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         null,
                         /*write-through*/false,
                         /*read-through*/false,
-                        /*retval*/sndPrevVal || lsnrs != null,
+                        /*retval*/sndPrevVal,
                         req.keepBinary(),
                         expiry,
                         /*event*/true,
@@ -2517,7 +2479,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         req.subjectId(),
                         taskName,
                         null,
-                        null);
+                        null,
+                        dhtFut);
 
                     assert !updRes.success() || updRes.newTtl() == CU.TTL_NOT_CHANGED || expiry != null :
                         "success=" + updRes.success() + ", newTtl=" + updRes.newTtl() + ", expiry=" + expiry;
@@ -2548,12 +2511,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     }
 
                     if (dhtFut != null) {
-                        dhtFut.listeners(lsnrs);
-
                         EntryProcessor<Object, Object, Object> entryProcessor =
                             entryProcessorMap == null ? null : entryProcessorMap.get(entry.key());
 
-                        if (!batchRes.readersOnly())
+                        if (!batchRes.readersOnly()) {
                             dhtFut.addWriteEntry(entry,
                                 writeVal,
                                 entryProcessor,
@@ -2563,6 +2524,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 sndPrevVal,
                                 updRes.oldValue(),
                                 updRes.updateCounter());
+                        }
 
                         if (!F.isEmpty(filteredReaders))
                             dhtFut.addNearWriteEntries(filteredReaders,
@@ -2572,19 +2534,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 updRes.newTtl(),
                                 CU.EXPIRE_TIME_CALCULATE);
                     }
-                    else if (lsnrs != null && updRes.updateCounter() != 0) {
-                        ctx.continuousQueries().onEntryUpdated(
-                            lsnrs,
-                            entry.key(),
-                            updRes.newValue(),
-                            updRes.oldValue(),
-                            entry.isInternal() || !context().userCache(),
-                            entry.partition(),
-                            primary,
-                            false,
-                            updRes.updateCounter(),
-                            topVer);
-                    }
 
                     if (hasNear) {
                         if (primary) {
@@ -2965,10 +2914,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
 
-        boolean initLsnrs = false;
-        Map<UUID, CacheContinuousQueryListener> lsnrs = null;
-        boolean internal = false;
-
         for (int i = 0; i < req.size(); i++) {
             KeyCacheObject key = req.key(i);
 
@@ -2991,14 +2936,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         long ttl = req.ttl(i);
                         long expireTime = req.conflictExpireTime(i);
 
-                        if (!initLsnrs) {
-                            internal = entry.isInternal() || !context().userCache();
-
-                            lsnrs = ctx.continuousQueries().updateListeners(internal, false);
-
-                            initLsnrs = true;
-                        }
-
                         GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
                             ver,
                             nodeId,
@@ -3008,7 +2945,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             op == TRANSFORM ? req.invokeArguments() : null,
                             /*write-through*/false,
                             /*read-through*/false,
-                            /*retval*/lsnrs != null,
+                            /*retval*/false,
                             req.keepBinary(),
                             /*expiry policy*/null,
                             /*event*/true,
@@ -3026,25 +2963,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             req.subjectId(),
                             taskName,
                             prevVal,
-                            updateIdx);
+                            updateIdx,
+                            null);
 
                         if (updRes.removeVersion() != null)
                             ctx.onDeferredDelete(entry, updRes.removeVersion());
 
-                        if (lsnrs != null && updRes.updateCounter() != 0) {
-                            ctx.continuousQueries().onEntryUpdated(
-                                lsnrs,
-                                entry.key(),
-                                updRes.newValue(),
-                                updRes.oldValue(),
-                                internal,
-                                entry.partition(),
-                                false,
-                                false,
-                                updRes.updateCounter(),
-                                req.topologyVersion());
-                        }
-
                         entry.onUnlock();
 
                         break; // While.

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 4721d6e..5760596 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -37,11 +37,11 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
-import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -97,15 +97,15 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
     /** Future keys. */
     private final Collection<KeyCacheObject> keys;
 
+    /** Continuous query closures. */
+    private Collection<CI1<Boolean>> cntQryClsrs;
+
     /** */
     private final boolean waitForExchange;
 
     /** Response count. */
     private volatile int resCnt;
 
-    /** */
-    private Map<UUID, CacheContinuousQueryListener> lsnrs;
-
     /**
      * @param cctx Cache context.
      * @param completionCb Callback to invoke when future is completed.
@@ -138,13 +138,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
         waitForExchange = !(updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest()));
     }
 
-    /**
-     * @param lsnrs Continuous query listeners.
-     */
-    void listeners(@Nullable Map<UUID, CacheContinuousQueryListener> lsnrs) {
-        this.lsnrs = lsnrs;
-    }
-
     /** {@inheritDoc} */
     @Override public IgniteUuid futureId() {
         return futVer.asGridUuid();
@@ -276,27 +269,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
                     addPrevVal,
                     entry.partition(),
                     prevVal,
-                    updateCntr,
-                    lsnrs != null);
-            }
-            else if (lsnrs != null && dhtNodes.size() == 1) {
-                try {
-                    cctx.continuousQueries().onEntryUpdated(
-                        lsnrs,
-                        entry.key(),
-                        val,
-                        prevVal,
-                        entry.key().internal() || !cctx.userCache(),
-                        entry.partition(),
-                        true,
-                        false,
-                        updateCntr,
-                        updateReq.topologyVersion());
-                }
-                catch (IgniteCheckedException e) {
-                    U.warn(log, "Failed to send continuous query message. [key=" + entry.key() + ", newVal="
-                        + val + ", err=" + e + "]");
-                }
+                    updateCntr);
             }
         }
     }
@@ -361,77 +334,33 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
         }
     }
 
+    /**
+     * @param clsr Continuous query closure.
+     */
+    public void addContinuousQueryClosure(CI1<Boolean> clsr){
+        assert !isDone() : this;
+
+        if (cntQryClsrs == null)
+            cntQryClsrs = new ArrayList<>(10);
+
+        cntQryClsrs.add(clsr);
+    }
+
     /** {@inheritDoc} */
     @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) {
         if (super.onDone(res, err)) {
             cctx.mvcc().removeAtomicFuture(version());
 
-            if (err != null) {
-                if (!mappings.isEmpty() && lsnrs != null) {
-                    Collection<KeyCacheObject> hndKeys = new ArrayList<>(keys.size());
-
-                    exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) {
-                        for (int i = 0; i < req.size(); i++) {
-                            KeyCacheObject key = req.key(i);
-
-                            if (!hndKeys.contains(key)) {
-                                updateRes.addFailedKey(key, err);
+            boolean suc = err == null;
 
-                                cctx.continuousQueries().skipUpdateEvent(
-                                    lsnrs,
-                                    key,
-                                    req.partitionId(i),
-                                    req.updateCounter(i),
-                                    updateReq.topologyVersion());
-
-                                hndKeys.add(key);
-
-                                if (hndKeys.size() == keys.size())
-                                    break exit;
-                            }
-                        }
-                    }
-                }
-                else
-                    for (KeyCacheObject key : keys)
-                        updateRes.addFailedKey(key, err);
+            if (!suc) {
+                for (KeyCacheObject key : keys)
+                    updateRes.addFailedKey(key, err);
             }
-            else {
-                if (lsnrs != null) {
-                    Collection<KeyCacheObject> hndKeys = new ArrayList<>(keys.size());
-
-                    exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) {
-                        for (int i = 0; i < req.size(); i++) {
-                            KeyCacheObject key = req.key(i);
-
-                            if (!hndKeys.contains(key)) {
-                                try {
-                                    cctx.continuousQueries().onEntryUpdated(
-                                        lsnrs,
-                                        key,
-                                        req.value(i),
-                                        req.localPreviousValue(i),
-                                        key.internal() || !cctx.userCache(),
-                                        req.partitionId(i),
-                                        true,
-                                        false,
-                                        req.updateCounter(i),
-                                        updateReq.topologyVersion());
-                                }
-                                catch (IgniteCheckedException e) {
-                                    U.warn(log, "Failed to send continuous query message. [key=" + key +
-                                        ", newVal=" + req.value(i) +
-                                        ", err=" + e + "]");
-                                }
-
-                                hndKeys.add(key);
-
-                                if (hndKeys.size() == keys.size())
-                                    break exit;
-                            }
-                        }
-                    }
-                }
+
+            if (cntQryClsrs != null) {
+                for (CI1<Boolean> clsr : cntQryClsrs)
+                    clsr.apply(suc);
             }
 
             if (updateReq.writeSynchronizationMode() == FULL_SYNC)

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index c8e33c2..b5e2835 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -155,10 +155,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
     @GridDirectTransient
     private List<Integer> partIds;
 
-    /** */
-    @GridDirectTransient
-    private List<CacheObject> locPrevVals;
-
     /** Keep binary flag. */
     private boolean keepBinary;
 
@@ -242,7 +238,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
      * @param partId Partition.
      * @param prevVal Previous value.
      * @param updateCntr Update counter.
-     * @param storeLocPrevVal If {@code true} stores previous value.
      */
     public void addWriteValue(KeyCacheObject key,
         @Nullable CacheObject val,
@@ -253,19 +248,12 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
         boolean addPrevVal,
         int partId,
         @Nullable CacheObject prevVal,
-        @Nullable Long updateCntr,
-        boolean storeLocPrevVal) {
+        @Nullable Long updateCntr
+    ) {
         keys.add(key);
 
         partIds.add(partId);
 
-        if (storeLocPrevVal) {
-            if (locPrevVals == null)
-                locPrevVals = new ArrayList<>();
-
-            locPrevVals.add(prevVal);
-        }
-
         if (forceTransformBackups) {
             assert entryProcessor != null;
 
@@ -526,16 +514,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
     /**
      * @param idx Key index.
-     * @return Value.
-     */
-    @Nullable public CacheObject localPreviousValue(int idx) {
-        assert locPrevVals != null;
-
-        return locPrevVals.get(idx);
-    }
-
-    /**
-     * @param idx Key index.
      * @return Entry processor.
      */
     @Nullable public EntryProcessor<Object, Object, Object> entryProcessor(int idx) {
@@ -1069,13 +1047,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
     private void cleanup() {
         nearVals = null;
         prevVals = null;
-
-        // Do not keep values if they are not needed for continuous query notification.
-        if (locPrevVals == null) {
-            keys = null;
-            vals = null;
-            locPrevVals = null;
-        }
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index a7481d3..3e0e392 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -37,7 +37,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
-import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
@@ -271,6 +270,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
                         subjId,
                         taskName,
                         null,
+                        null,
                         null);
 
                     if (updRes.removeVersion() != null)
@@ -372,6 +372,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
                             req.subjectId(),
                             taskName,
                             null,
+                            null,
                             null);
 
                         if (updRes.removeVersion() != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
index eab5dbd..db70e2e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
@@ -56,6 +56,13 @@ class CacheContinuousQueryEvent<K, V> extends CacheQueryEntryEvent<K, V> {
         return e;
     }
 
+    /**
+     * @return Partition ID.
+     */
+    public int partitionId() {
+        return e.partition();
+    }
+
     /** {@inheritDoc} */
     @Override public K getKey() {
         return (K)cctx.cacheObjectContext().unwrapBinaryIfNeeded(e.key(), e.isKeepBinary(), false);

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 16513b0..9ae2972 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -48,6 +48,7 @@ import org.apache.ignite.events.CacheQueryExecutedEvent;
 import org.apache.ignite.events.CacheQueryReadEvent;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteDeploymentCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
@@ -58,7 +59,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryLocalListener;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryRemoteFilter;
 import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
 import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
@@ -66,13 +70,14 @@ import org.apache.ignite.internal.processors.platform.cache.query.PlatformContin
 import org.apache.ignite.internal.util.GridConcurrentSkipListSet;
 import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.C1;
+import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteAsyncCallback;
 import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentLinkedDeque8;
@@ -159,6 +164,21 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     /** */
     private transient boolean ignoreClsNotFound;
 
+    /** */
+    private transient boolean asyncCallback;
+
+    /** */
+    private transient UUID nodeId;
+
+    /** */
+    private transient UUID routineId;
+
+    /** */
+    private transient GridKernalContext ctx;
+
+    /** */
+    private transient IgniteLogger log;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -283,13 +303,36 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         assert routineId != null;
         assert ctx != null;
 
-        if (locLsnr != null)
-            ctx.resource().injectGeneric(locLsnr);
+        if (locLsnr != null) {
+            if (locLsnr instanceof JCacheQueryLocalListener) {
+                ctx.resource().injectGeneric(((JCacheQueryLocalListener)locLsnr).impl);
+
+                asyncCallback = ((JCacheQueryLocalListener)locLsnr).async();
+            }
+            else {
+                ctx.resource().injectGeneric(locLsnr);
+
+                asyncCallback = U.hasAnnotation(locLsnr, IgniteAsyncCallback.class);
+            }
+        }
 
         final CacheEntryEventFilter filter = getEventFilter();
 
-        if (filter != null)
-            ctx.resource().injectGeneric(filter);
+        if (filter != null) {
+            if (filter instanceof JCacheQueryRemoteFilter) {
+                if (((JCacheQueryRemoteFilter)filter).impl != null)
+                    ctx.resource().injectGeneric(((JCacheQueryRemoteFilter)filter).impl);
+
+                if (!asyncCallback)
+                    asyncCallback = ((JCacheQueryRemoteFilter)filter).async();
+            }
+            else {
+                ctx.resource().injectGeneric(filter);
+
+                if (!asyncCallback)
+                    asyncCallback = U.hasAnnotation(filter, IgniteAsyncCallback.class);
+            }
+        }
 
         entryBufs = new ConcurrentHashMap<>();
 
@@ -299,10 +342,18 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
         rcvs = new ConcurrentHashMap<>();
 
+        this.nodeId = nodeId;
+
+        this.routineId = routineId;
+
+        this.ctx = ctx;
+
         final boolean loc = nodeId.equals(ctx.localNodeId());
 
         assert !skipPrimaryCheck || loc;
 
+        log = ctx.log(CacheContinuousQueryHandler.class);
+
         CacheContinuousQueryListener<K, V> lsnr = new CacheContinuousQueryListener<K, V>() {
             @Override public void onExecution() {
                 if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
@@ -324,15 +375,16 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 }
             }
 
-            /** {@inheritDoc} */
             @Override public boolean keepBinary() {
                 return keepBinary;
             }
 
-            @Override public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary,
-                boolean recordIgniteEvt) {
+            @Override public void onEntryUpdated(final CacheContinuousQueryEvent<K, V> evt,
+                boolean primary,
+                final boolean recordIgniteEvt,
+                GridDhtAtomicUpdateFuture fut) {
                 if (ignoreExpired && evt.getEventType() == EventType.EXPIRED)
-                    return;
+                    return ;
 
                 final GridCacheContext<K, V> cctx = cacheContext(ctx);
 
@@ -343,93 +395,33 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 // skipPrimaryCheck is set only when listen locally for replicated cache events.
                 assert !skipPrimaryCheck || (cctx.isReplicated() && ctx.localNodeId().equals(nodeId));
 
-                boolean notify = !evt.entry().isFiltered();
+                if (asyncCallback) {
+                    ContinuousQueryAsyncClosure clsr = new ContinuousQueryAsyncClosure(
+                        primary,
+                        evt,
+                        recordIgniteEvt,
+                        fut);
 
-                if (notify && filter != null) {
-                    try {
-                        notify = filter.evaluate(evt);
-                    }
-                    catch (Exception e) {
-                        U.error(cctx.logger(CacheContinuousQueryHandler.class), "CacheEntryEventFilter failed: " + e);
-                    }
+                    ctx.asyncCallbackPool().execute(clsr, evt.partitionId());
                 }
-
-                try {
-                    final CacheContinuousQueryEntry entry = evt.entry();
-
-                    if (!notify)
-                        entry.markFiltered();
+                else {
+                    final boolean notify = filter(evt, primary);
 
                     if (primary || skipPrimaryCheck) {
-                        if (loc) {
-                            if (!locCache) {
-                                Collection<CacheEntryEvent<? extends K, ? extends V>> entries = handleEvent(ctx, entry);
-
-                                if (!entries.isEmpty()) {
-                                    locLsnr.onUpdated(entries);
-
-                                    if (!internal && !skipPrimaryCheck)
-                                        sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx);
-                                }
-                            }
-                            else {
-                                if (!entry.isFiltered())
-                                    locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt));
-                            }
-                        }
+                        if (fut == null)
+                            onEntryUpdate(evt, notify, loc, recordIgniteEvt);
                         else {
-                            if (!entry.isFiltered())
-                                prepareEntry(cctx, nodeId, entry);
-
-                            CacheContinuousQueryEntry e = handleEntry(entry);
-
-                            if (e != null)
-                                ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true);
-                        }
-                    }
-                    else {
-                        if (!internal) {
-                            // Skip init query and expire entries.
-                            if (entry.updateCounter() != -1L) {
-                                entry.markBackup();
+                            fut.addContinuousQueryClosure(new CI1<Boolean>() {
+                                @Override public void apply(Boolean suc) {
+                                    if (!suc)
+                                        evt.entry().markFiltered();
 
-                                backupQueue.add(entry);
-                            }
+                                    onEntryUpdate(evt, notify, loc, recordIgniteEvt);
+                                }
+                            });
                         }
                     }
                 }
-                catch (ClusterTopologyCheckedException ex) {
-                    IgniteLogger log = ctx.log(getClass());
-
-                    if (log.isDebugEnabled())
-                        log.debug("Failed to send event notification to node, node left cluster " +
-                            "[node=" + nodeId + ", err=" + ex + ']');
-                }
-                catch (IgniteCheckedException ex) {
-                    U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex);
-                }
-
-                if (recordIgniteEvt && notify) {
-                    ctx.event().record(new CacheQueryReadEvent<>(
-                        ctx.discovery().localNode(),
-                        "Continuous query executed.",
-                        EVT_CACHE_QUERY_OBJECT_READ,
-                        CacheQueryType.CONTINUOUS.name(),
-                        cacheName,
-                        null,
-                        null,
-                        null,
-                        filter instanceof CacheEntryEventSerializableFilter ?
-                            (CacheEntryEventSerializableFilter)filter : null,
-                        null,
-                        nodeId,
-                        taskName(),
-                        evt.getKey(),
-                        evt.getValue(),
-                        evt.getOldValue(),
-                        null
-                    ));
-                }
             }
 
             @Override public void onUnregister() {
@@ -475,15 +467,15 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 sendBackupAcknowledge(ackBuf.acknowledgeOnTimeout(), routineId, ctx);
             }
 
-            @Override public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer,
-                boolean primary) {
+            @Override public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt,
+                AffinityTopologyVersion topVer, boolean primary) {
                 assert evt != null;
 
                 CacheContinuousQueryEntry e = evt.entry();
 
                 e.markFiltered();
 
-                onEntryUpdated(evt, primary, false);
+                onEntryUpdated(evt, primary, false, null);
             }
 
             @Override public void onPartitionEvicted(int part) {
@@ -580,17 +572,73 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Override public void notifyCallback(UUID nodeId, UUID routineId, Collection<?> objs, GridKernalContext ctx) {
+    @Override public void notifyCallback(final UUID nodeId,
+        final UUID routineId,
+        Collection<?> objs,
+        final GridKernalContext ctx) {
         assert nodeId != null;
         assert routineId != null;
         assert objs != null;
         assert ctx != null;
 
-        Collection<CacheContinuousQueryEntry> entries = (Collection<CacheContinuousQueryEntry>)objs;
+        final List<CacheContinuousQueryEntry> entries = (List<CacheContinuousQueryEntry>)objs;
+
+        if (entries.isEmpty())
+            return;
+
+        if (asyncCallback) {
+            IgniteStripedThreadPoolExecutor asyncPool = ctx.asyncCallbackPool();
+
+            int threadId = asyncPool.threadId(entries.get(0).partition());
+
+            int startIdx = 0;
+
+            if (entries.size() != 1) {
+                for (int i = 1; i < entries.size(); i++) {
+                    int curThreadId = asyncPool.threadId(entries.get(i).partition());
+
+                    // If all entries from one partition avoid creation new collections.
+                    if (curThreadId == threadId)
+                        continue;
+
+                    final int i0 = i;
+                    final int startIdx0 = startIdx;
+
+                    asyncPool.execute(new Runnable() {
+                        @Override public void run() {
+                            notifyCallback0(nodeId, ctx, entries.subList(startIdx0, i0));
+                        }
+                    }, threadId);
+
+                    startIdx = i0;
+                    threadId = curThreadId;
+                }
+            }
+
+            final int startIdx0 = startIdx;
+
+            asyncPool.execute(new Runnable() {
+                @Override public void run() {
+                    notifyCallback0(nodeId, ctx,
+                        startIdx0 == 0 ? entries : entries.subList(startIdx0, entries.size()));
+                }
+            }, threadId);
+        }
+        else
+            notifyCallback0(nodeId, ctx, entries);
+    }
 
+    /**
+     * @param nodeId Node id.
+     * @param ctx Kernal context.
+     * @param entries Entries.
+     */
+    private void notifyCallback0(UUID nodeId,
+        final GridKernalContext ctx,
+        Collection<CacheContinuousQueryEntry> entries) {
         final GridCacheContext cctx = cacheContext(ctx);
 
-        Collection<CacheEntryEvent<? extends K, ? extends V>> entries0 = new ArrayList<>();
+        final Collection<CacheEntryEvent<? extends K, ? extends V>> entries0 = new ArrayList<>(entries.size());
 
         for (CacheContinuousQueryEntry e : entries) {
             GridCacheDeploymentManager depMgr = cctx.deploy();
@@ -609,7 +657,10 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
             try {
                 e.unmarshal(cctx, ldr);
 
-                entries0.addAll(handleEvent(ctx, e));
+                Collection<CacheEntryEvent<? extends K, ? extends V>> evts = handleEvent(ctx, e);
+
+                if (evts != null && !evts.isEmpty())
+                    entries0.addAll(evts);
             }
             catch (IgniteCheckedException ex) {
                 if (ignoreClsNotFound)
@@ -640,8 +691,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
             if (e.isFiltered())
                 return Collections.emptyList();
             else
-                return F.<CacheEntryEvent<? extends K, ? extends V>>asList(
-                    new CacheContinuousQueryEvent<K, V>(cache, cctx, e));
+                return F.<CacheEntryEvent<? extends K, ? extends V>>
+                    asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, e));
         }
 
         // Initial query entry or evicted entry. These events should be fired immediately.
@@ -653,7 +704,117 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
         PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition());
 
-        return rec.collectEntries(cctx, cache, e);
+        return rec.collectEntries(e, cctx, cache);
+    }
+
+    /**
+     * @param primary Primary.
+     * @param evt Query event.
+     * @return {@code True} if event passed filter otherwise {@code true}.
+     */
+    public boolean filter(CacheContinuousQueryEvent evt, boolean primary) {
+        CacheContinuousQueryEntry entry = evt.entry();
+
+        boolean notify = !entry.isFiltered();
+
+        try {
+            if (notify && getEventFilter() != null)
+                notify = getEventFilter().evaluate(evt);
+        }
+        catch (Exception e) {
+            U.error(log, "CacheEntryEventFilter failed: " + e);
+        }
+
+        if (!notify)
+            entry.markFiltered();
+
+        if (!primary && !internal && entry.updateCounter() != -1L /* Skip init query and expire entries */) {
+            entry.markBackup();
+
+            backupQueue.add(entry);
+        }
+
+        return notify;
+    }
+
+    /**
+     * @param evt Continuous query event.
+     * @param notify Notify flag.
+     * @param loc Listener deployed on this node.
+     * @param recordIgniteEvt Record ignite event.
+     */
+    private void onEntryUpdate(CacheContinuousQueryEvent evt, boolean notify, boolean loc, boolean recordIgniteEvt) {
+        try {
+            GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+            if (cctx == null)
+                return;
+
+            final CacheContinuousQueryEntry entry = evt.entry();
+
+            if (loc) {
+                if (!locCache) {
+                    Collection<CacheEntryEvent<? extends K, ? extends V>> evts = handleEvent(ctx, entry);
+
+                    if (!evts.isEmpty()) {
+                        locLsnr.onUpdated(evts);
+
+                        if (!internal && !skipPrimaryCheck)
+                            sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx);
+                    }
+                }
+                else {
+                    if (!entry.isFiltered())
+                        locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt));
+                }
+            }
+            else {
+                if (!entry.isFiltered())
+                    prepareEntry(cctx, nodeId, entry);
+
+                CacheContinuousQueryEntry e = handleEntry(entry);
+
+                if (e != null)
+                    ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true);
+            }
+        }
+        catch (ClusterTopologyCheckedException ex) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send event notification to node, node left cluster " +
+                    "[node=" + nodeId + ", err=" + ex + ']');
+        }
+        catch (IgniteCheckedException ex) {
+            U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex);
+        }
+
+        if (recordIgniteEvt && notify) {
+            ctx.event().record(new CacheQueryReadEvent<>(
+                ctx.discovery().localNode(),
+                "Continuous query executed.",
+                EVT_CACHE_QUERY_OBJECT_READ,
+                CacheQueryType.CONTINUOUS.name(),
+                cacheName,
+                null,
+                null,
+                null,
+                getEventFilter() instanceof CacheEntryEventSerializableFilter ?
+                    (CacheEntryEventSerializableFilter)getEventFilter() : null,
+                null,
+                nodeId,
+                taskName(),
+                evt.getKey(),
+                evt.getValue(),
+                evt.getOldValue(),
+                null
+            ));
+        }
+    }
+
+    /**
+     * @return Task name.
+     */
+    private String taskName() {
+        return ctx.security().enabled() ? ctx.task().resolveTaskName(taskHash) : null;
     }
 
     /**
@@ -781,9 +942,11 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
          * @param entry Cache continuous query entry.
          * @return Collection entries which will be fired. This collection should contains only non-filtered events.
          */
-        public <K, V> Collection<CacheEntryEvent<? extends K, ? extends V>> collectEntries(GridCacheContext cctx,
-            IgniteCache cache,
-            CacheContinuousQueryEntry entry) {
+        <K, V> Collection<CacheEntryEvent<? extends K, ? extends V>> collectEntries(
+            CacheContinuousQueryEntry entry,
+            GridCacheContext cctx,
+            IgniteCache cache
+        ) {
             assert entry != null;
 
             if (entry.topologyVersion() == null) { // Possible if entry is sent from old node.
@@ -1241,6 +1404,87 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     }
 
     /**
+     *
+     */
+    private class ContinuousQueryAsyncClosure implements Runnable {
+        /** */
+        private final CacheContinuousQueryEvent<K, V> evt;
+
+        /** */
+        private final boolean primary;
+
+        /** */
+        private final boolean recordIgniteEvt;
+
+        /** */
+        private final IgniteInternalFuture<?> fut;
+
+        /**
+         * @param primary Primary flag.
+         * @param evt Event.
+         * @param recordIgniteEvt Fired event.
+         * @param fut Dht future.
+         */
+        ContinuousQueryAsyncClosure(
+            boolean primary,
+            CacheContinuousQueryEvent<K, V> evt,
+            boolean recordIgniteEvt,
+            IgniteInternalFuture<?> fut) {
+            this.primary = primary;
+            this.evt = evt;
+            this.recordIgniteEvt = recordIgniteEvt;
+            this.fut = fut;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            final boolean notify = filter(evt, primary);
+
+            if (!primary())
+                return;
+
+            if (fut == null) {
+                onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt);
+
+                return;
+            }
+
+            if (fut.isDone()) {
+                if (fut.error() != null)
+                    evt.entry().markFiltered();
+
+                onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt);
+            }
+            else {
+                fut.listen(new CI1<IgniteInternalFuture<?>>() {
+                    @Override public void apply(IgniteInternalFuture<?> f) {
+                        if (f.error() != null)
+                            evt.entry().markFiltered();
+
+                        ctx.asyncCallbackPool().execute(new Runnable() {
+                            @Override public void run() {
+                                onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt);
+                            }
+                        }, evt.entry().partition());
+                    }
+                });
+            }
+        }
+
+        /**
+         * @return {@code True} if event fired on this node.
+         */
+        private boolean primary() {
+            return primary || skipPrimaryCheck;
+        }
+
+        /** {@inheritDoc} */
+        public String toString() {
+            return S.toString(ContinuousQueryAsyncClosure.class, this);
+        }
+    }
+
+    /**
      * Deployable object.
      */
     protected static class DeployableObject implements Externalizable {

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index 83ff32c..8eca81c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal.processors.cache.query.continuous;
 import java.util.Map;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Continuous query listener.
@@ -36,8 +38,10 @@ public interface CacheContinuousQueryListener<K, V> {
      * @param evt Event
      * @param primary Primary flag.
      * @param recordIgniteEvt Whether to record event.
+     * @param fut Dht atomic future.
      */
-    public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary, boolean recordIgniteEvt);
+    public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary,
+        boolean recordIgniteEvt, @Nullable GridDhtAtomicUpdateFuture fut);
 
     /**
      * Listener unregistered callback.


Mime
View raw message