ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [05/43] ignite git commit: IGNITE-2004 Fixed "Asynchronous execution of ContinuousQuery's remote filter & local list".
Date Thu, 19 May 2016 09:37:38 GMT
IGNITE-2004 Fixed "Asynchronous execution of ContinuousQuery's remote filter & local list".


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9df46fc0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9df46fc0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9df46fc0

Branch: refs/heads/ignite-3163
Commit: 9df46fc0792a375c91e216a670edb3069e2f9b40
Parents: c88182c
Author: nikolay_tikhonov <ntikhonov@gridgain.com>
Authored: Fri Apr 22 19:02:26 2016 +0300
Committer: nikolay_tikhonov <ntikhonov@gridgain.com>
Committed: Fri Apr 22 19:02:26 2016 +0300

----------------------------------------------------------------------
 .../CacheContinuousAsyncQueryExample.java       | 138 +++
 .../datagrid/CacheContinuousQueryExample.java   |  13 +-
 .../ignite/cache/query/ContinuousQuery.java     |  27 +
 .../configuration/IgniteConfiguration.java      |  32 +
 .../ignite/internal/GridKernalContext.java      |   8 +
 .../ignite/internal/GridKernalContextImpl.java  |  12 +
 .../apache/ignite/internal/IgniteKernal.java    |   3 +
 .../org/apache/ignite/internal/IgnitionEx.java  |  16 +-
 .../processors/cache/GridCacheEntryEx.java      |   5 +-
 .../processors/cache/GridCacheMapEntry.java     |  31 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  98 +-
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   | 121 +--
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  |  33 +-
 .../distributed/near/GridNearAtomicCache.java   |   2 +
 .../continuous/CacheContinuousQueryEvent.java   |   7 +
 .../continuous/CacheContinuousQueryHandler.java | 519 +++++++---
 .../CacheContinuousQueryListener.java           |   6 +-
 .../continuous/CacheContinuousQueryManager.java |  62 +-
 .../apache/ignite/lang/IgniteAsyncCallback.java | 111 +++
 .../thread/IgniteStripedThreadPoolExecutor.java | 164 +--
 .../processors/cache/GridCacheTestEntryEx.java  |   4 +-
 ...FailoverAtomicPrimaryWriteOrderSelfTest.java |  50 +
 ...sQueryAsyncFailoverTxReplicatedSelfTest.java |  37 +
 ...eContinuousQueryAsyncFailoverTxSelfTest.java |  44 +
 ...eContinuousQueryAsyncFilterListenerTest.java | 986 +++++++++++++++++++
 ...ryFactoryAsyncFilterRandomOperationTest.java | 131 +++
 ...usQueryFactoryFilterRandomOperationTest.java | 725 ++++++++++++++
 .../CacheContinuousQueryFactoryFilterTest.java  | 714 --------------
 ...ContinuousQueryFailoverAbstractSelfTest.java |  63 +-
 .../CacheContinuousQueryLostPartitionTest.java  |  14 +
 ...ontinuousQueryOperationFromCallbackTest.java | 627 ++++++++++++
 .../CacheContinuousQueryOrderingEventTest.java  | 722 ++++++++++++++
 ...acheContinuousQueryRandomOperationsTest.java |  23 +
 .../junits/GridTestKernalContext.java           |  25 +-
 .../IgniteBinaryCacheQueryTestSuite.java        |   1 -
 .../IgniteCacheQuerySelfTestSuite3.java         |  14 +-
 .../IgniteCacheQuerySelfTestSuite4.java         |   7 +
 .../cache/CacheEntryEventAsyncProbe.java        |  61 ++
 .../yardstick/cache/CacheEntryEventProbe.java   |  33 +-
 39 files changed, 4432 insertions(+), 1257 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousAsyncQueryExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousAsyncQueryExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousAsyncQueryExample.java
new file mode 100644
index 0000000..4ac7ecb
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousAsyncQueryExample.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.datagrid;
+
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.examples.ExampleNodeStartup;
+import org.apache.ignite.lang.IgniteAsyncCallback;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.resources.IgniteInstanceResource;
+
+/**
+ * This examples demonstrates asynchronous continuous query API.
+ * <p>
+ * Remote nodes should always be started with special configuration file which
+ * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}.
+ * <p>
+ * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will
+ * start node with {@code examples/config/example-ignite.xml} configuration.
+ */
+public class CacheContinuousAsyncQueryExample {
+    /** Cache name. */
+    private static final String CACHE_NAME = CacheContinuousAsyncQueryExample.class.getSimpleName();
+
+    /**
+     * Executes example.
+     *
+     * @param args Command line arguments, none required.
+     * @throws Exception If example execution failed.
+     */
+    public static void main(String[] args) throws Exception {
+        try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
+            System.out.println();
+            System.out.println(">>> Cache continuous query example started.");
+
+            // Auto-close cache at the end of the example.
+            try (IgniteCache<Integer, String> cache = ignite.getOrCreateCache(CACHE_NAME)) {
+                int keyCnt = 20;
+
+                // These entries will be queried by initial predicate.
+                for (int i = 0; i < keyCnt; i++)
+                    cache.put(i, Integer.toString(i));
+
+                // Create new continuous query.
+                ContinuousQuery<Integer, String> qry = new ContinuousQuery<>();
+
+                qry.setInitialQuery(new ScanQuery<>(new IgniteBiPredicate<Integer, String>() {
+                    @Override public boolean apply(Integer key, String val) {
+                        return key > 10;
+                    }
+                }));
+
+                // Callback that is called locally when update notifications are received.
+                qry.setLocalListener(new CacheEntryUpdatedListener<Integer, String>() {
+                    @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> evts) {
+                        for (CacheEntryEvent<? extends Integer, ? extends String> e : evts)
+                            System.out.println("Updated entry [key=" + e.getKey() + ", val=" + e.getValue() + ']');
+                    }
+                });
+
+                // This filter will be evaluated remotely on all nodes.
+                // Entry that pass this filter will be sent to the caller.
+                qry.setRemoteFilterFactory(new Factory<CacheEntryEventFilter<Integer, String>>() {
+                    @Override public CacheEntryEventFilter<Integer, String> create() {
+                        return new CacheEntryFilter();
+                    }
+                });
+
+                // Execute query.
+                try (QueryCursor<Cache.Entry<Integer, String>> cur = cache.query(qry)) {
+                    // Iterate through existing data.
+                    for (Cache.Entry<Integer, String> e : cur)
+                        System.out.println("Queried existing entry [key=" + e.getKey() + ", val=" + e.getValue() + ']');
+
+                    // Add a few more keys and watch more query notifications.
+                    for (int i = 0; i < keyCnt; i++)
+                        cache.put(i, Integer.toString(i));
+
+                    // Wait for a while while callback is notified about remaining puts.
+                    Thread.sleep(2000);
+                }
+
+                // Iterate through entries which was updated from filter.
+                for (int i = 0; i < 10; i++)
+                    System.out.println("Entry updated from filter [key=" + i + ", val=" + cache.get(i) + ']');
+            }
+            finally {
+                // Distributed cache could be removed from cluster only by #destroyCache() call.
+                ignite.destroyCache(CACHE_NAME);
+            }
+        }
+    }
+
+    /**
+     * Filter returns {@code true} for entries which have key bigger than 10.
+     */
+    @IgniteAsyncCallback
+    private static class CacheEntryFilter implements CacheEntryEventFilter<Integer, String> {
+        /** Ignite instance. */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends String> e)
+            throws CacheEntryListenerException {
+            // This cache operation is safe because filter has Ignite AsyncCallback annotation.
+            if (e.getKey() < 10 && String.valueOf(e.getKey()).equals(e.getValue()))
+                ignite.cache(CACHE_NAME).put(e.getKey(), e.getValue() + "_less_than_10");
+
+            return e.getKey() > 10;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java
index 0ce6e89..fdfbc47 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java
@@ -18,12 +18,13 @@
 package org.apache.ignite.examples.datagrid;
 
 import javax.cache.Cache;
+import javax.cache.configuration.Factory;
 import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
 import javax.cache.event.CacheEntryUpdatedListener;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.Ignition;
-import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.ScanQuery;
@@ -80,9 +81,13 @@ public class CacheContinuousQueryExample {
 
                 // This filter will be evaluated remotely on all nodes.
                 // Entry that pass this filter will be sent to the caller.
-                qry.setRemoteFilter(new CacheEntryEventSerializableFilter<Integer, String>() {
-                    @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends String> e) {
-                        return e.getKey() > 10;
+                qry.setRemoteFilterFactory(new Factory<CacheEntryEventFilter<Integer, String>>() {
+                    @Override public CacheEntryEventFilter<Integer, String> create() {
+                        return new CacheEntryEventFilter<Integer, String>() {
+                            @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends String> e) {
+                                return e.getKey() > 10;
+                            }
+                        };
                     }
                 });
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
index 3ea8f93..bbfe8cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
@@ -23,6 +23,8 @@ import javax.cache.event.CacheEntryEventFilter;
 import javax.cache.event.CacheEntryUpdatedListener;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.lang.IgniteAsyncCallback;
 
 /**
  * API for configuring continuous cache queries.
@@ -92,6 +94,16 @@ import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
  * Note that this works even if you didn't provide initial query. Cursor will
  * be empty in this case, but it will still unregister listeners when {@link QueryCursor#close()}
  * is called.
+ * <p>
+ * {@link IgniteAsyncCallback} annotation is supported for {@link CacheEntryEventFilter}
+ * (see {@link #setRemoteFilterFactory(Factory)}) and {@link CacheEntryUpdatedListener}
+ * (see {@link #setLocalListener(CacheEntryUpdatedListener)}).
+ * If filter and/or listener are annotated with {@link IgniteAsyncCallback} then annotated callback
+ * is executed in async callback pool (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()})
+ * and notification order is kept the same as update order for given cache key.
+ *
+ * @see IgniteAsyncCallback
+ * @see IgniteConfiguration#getAsyncCallbackPoolSize()
  */
 public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
     /** */
@@ -173,9 +185,14 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
      * <b>WARNING:</b> all operations that involve any kind of JVM-local or distributed locking (e.g.,
      * synchronization or transactional cache operations), should be executed asynchronously without
      * blocking the thread that called the callback. Otherwise, you can get deadlocks.
+     * <p>
+     * If local listener are annotated with {@link IgniteAsyncCallback} then it is executed in async callback pool
+     * (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()}) that allow to perform a cache operations.
      *
      * @param locLsnr Local callback.
      * @return {@code this} for chaining.
+     * @see IgniteAsyncCallback
+     * @see IgniteConfiguration#getAsyncCallbackPoolSize()
      */
     public ContinuousQuery<K, V> setLocalListener(CacheEntryUpdatedListener<K, V> locLsnr) {
         this.locLsnr = locLsnr;
@@ -198,11 +215,16 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
      * <b>WARNING:</b> all operations that involve any kind of JVM-local or distributed locking
      * (e.g., synchronization or transactional cache operations), should be executed asynchronously
      * without blocking the thread that called the filter. Otherwise, you can get deadlocks.
+     * <p>
+     * If remote filter are annotated with {@link IgniteAsyncCallback} then it is executed in async callback
+     * pool (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()}) that allow to perform a cache operations.
      *
      * @param rmtFilter Key-value filter.
      * @return {@code this} for chaining.
      *
      * @deprecated Use {@link #setRemoteFilterFactory(Factory)} instead.
+     * @see IgniteAsyncCallback
+     * @see IgniteConfiguration#getAsyncCallbackPoolSize()
      */
     @Deprecated
     public ContinuousQuery<K, V> setRemoteFilter(CacheEntryEventSerializableFilter<K, V> rmtFilter) {
@@ -227,9 +249,14 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
      * <b>WARNING:</b> all operations that involve any kind of JVM-local or distributed locking
      * (e.g., synchronization or transactional cache operations), should be executed asynchronously
      * without blocking the thread that called the filter. Otherwise, you can get deadlocks.
+     * <p>
+     * If remote filter are annotated with {@link IgniteAsyncCallback} then it is executed in async callback
+     * pool (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()}) that allow to perform a cache operations.
      *
      * @param rmtFilterFactory Key-value filter factory.
      * @return {@code this} for chaining.
+     * @see IgniteAsyncCallback
+     * @see IgniteConfiguration#getAsyncCallbackPoolSize()
      */
     public ContinuousQuery<K, V> setRemoteFilterFactory(
         Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index f705638..0aac562 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -41,6 +41,7 @@ import org.apache.ignite.events.Event;
 import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteAsyncCallback;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lifecycle.LifecycleBean;
@@ -218,6 +219,9 @@ public class IgniteConfiguration {
     /** Public pool size. */
     private int pubPoolSize = DFLT_PUBLIC_THREAD_CNT;
 
+    /** Async Callback pool size. */
+    private int callbackPoolSize = DFLT_PUBLIC_THREAD_CNT;
+
     /** System pool size. */
     private int sysPoolSize = DFLT_SYSTEM_CORE_THREAD_CNT;
 
@@ -710,6 +714,20 @@ public class IgniteConfiguration {
     }
 
     /**
+     * Size of thread pool that is in charge of processing asynchronous callbacks.
+     * <p>
+     * This pool is used for callbacks annotated with {@link IgniteAsyncCallback}.
+     * <p>
+     * If not provided, executor service will have size {@link #DFLT_PUBLIC_THREAD_CNT}.
+     *
+     * @return Thread pool size to be used.
+     * @see IgniteAsyncCallback
+     */
+    public int getAsyncCallbackPoolSize() {
+        return callbackPoolSize;
+    }
+
+    /**
      * Size of thread pool that is in charge of processing internal and Visor
      * {@link ComputeJob GridJobs}.
      * <p>
@@ -818,6 +836,20 @@ public class IgniteConfiguration {
     }
 
     /**
+     * Sets async callback thread pool size to use within grid.
+     *
+     * @param poolSize Thread pool size to use within grid.
+     * @return {@code this} for chaining.
+     * @see IgniteConfiguration#getAsyncCallbackPoolSize()
+     * @see IgniteAsyncCallback
+     */
+    public IgniteConfiguration setAsyncCallbackPoolSize(int poolSize) {
+        this.callbackPoolSize = poolSize;
+
+        return this;
+    }
+
+    /**
      * Sets management thread pool size to use within grid.
      *
      * @param poolSize Thread pool size to use within grid.

http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index c0b50a2..d650c97 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -66,6 +66,7 @@ import org.apache.ignite.internal.util.IgniteExceptionRegistry;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.plugin.PluginNotFoundException;
 import org.apache.ignite.plugin.PluginProvider;
+import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
 
 /**
  *
@@ -297,6 +298,13 @@ public interface GridKernalContext extends Iterable<GridComponent> {
     public ExecutorService marshallerCachePool();
 
     /**
+     * Gets async callback pool.
+     *
+     * @return Async callback pool.
+     */
+    public IgniteStripedThreadPoolExecutor asyncCallbackPool();
+
+    /**
      * Gets cache object processor.
      *
      * @return Cache object processor.

http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 22fd96c..ebc2688 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -88,6 +88,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.plugin.PluginNotFoundException;
 import org.apache.ignite.plugin.PluginProvider;
+import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_DAEMON;
@@ -300,6 +301,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
 
     /** */
     @GridToStringExclude
+    protected IgniteStripedThreadPoolExecutor callbackExecSvc;
+
+    /** */
+    @GridToStringExclude
     private Map<String, Object> attrs = new HashMap<>();
 
     /** */
@@ -374,6 +379,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
         ExecutorService mgmtExecSvc,
         ExecutorService igfsExecSvc,
         ExecutorService restExecSvc,
+        IgniteStripedThreadPoolExecutor callbackExecSvc,
         List<PluginProvider> plugins) throws IgniteCheckedException {
         assert grid != null;
         assert cfg != null;
@@ -390,6 +396,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
         this.mgmtExecSvc = mgmtExecSvc;
         this.igfsExecSvc = igfsExecSvc;
         this.restExecSvc = restExecSvc;
+        this.callbackExecSvc = callbackExecSvc;
 
         marshCtx = new MarshallerContextImpl(plugins);
 
@@ -739,6 +746,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteStripedThreadPoolExecutor asyncCallbackPool() {
+        return callbackExecSvc;
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteCacheObjectProcessor cacheObjects() {
         return cacheObjProc;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 89992cf..16df367 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -166,6 +166,7 @@ import org.apache.ignite.plugin.PluginProvider;
 import org.apache.ignite.spi.IgniteSpi;
 import org.apache.ignite.spi.IgniteSpiVersionCheckException;
 import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
+import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_CONFIG_URL;
@@ -657,6 +658,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         ExecutorService mgmtExecSvc,
         ExecutorService igfsExecSvc,
         ExecutorService restExecSvc,
+        IgniteStripedThreadPoolExecutor callbackExecSvc,
         GridAbsClosure errHnd)
         throws IgniteCheckedException
     {
@@ -761,6 +763,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
                 mgmtExecSvc,
                 igfsExecSvc,
                 restExecSvc,
+                callbackExecSvc,
                 plugins);
 
             cfg.getMarshaller().setContext(ctx.marshallerContext());

http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 5d74a6d..e2c4751 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -95,6 +95,7 @@ import org.apache.ignite.spi.indexing.noop.NoopIndexingSpi;
 import org.apache.ignite.spi.loadbalancing.roundrobin.RoundRobinLoadBalancingSpi;
 import org.apache.ignite.spi.swapspace.file.FileSwapSpaceSpi;
 import org.apache.ignite.spi.swapspace.noop.NoopSwapSpaceSpi;
+import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
 import org.apache.ignite.thread.IgniteThread;
 import org.apache.ignite.thread.IgniteThreadPoolExecutor;
 import org.jetbrains.annotations.Nullable;
@@ -1436,6 +1437,9 @@ public class IgnitionEx {
         /** Marshaller cache executor service. */
         private ExecutorService marshCacheExecSvc;
 
+        /** Continuous query executor service. */
+        private IgniteStripedThreadPoolExecutor callbackExecSvc;
+
         /** Grid state. */
         private volatile IgniteState state = STOPPED;
 
@@ -1648,6 +1652,12 @@ public class IgnitionEx {
                 0,
                 new LinkedBlockingQueue<Runnable>());
 
+            // Note that we do not pre-start threads here as this pool may not be needed.
+            callbackExecSvc = new IgniteStripedThreadPoolExecutor(
+                cfg.getAsyncCallbackPoolSize(),
+                cfg.getGridName(),
+                "callback");
+
             if (myCfg.getConnectorConfiguration() != null) {
                 restExecSvc = new IgniteThreadPoolExecutor(
                     "rest",
@@ -1687,7 +1697,7 @@ public class IgnitionEx {
                 grid = grid0;
 
                 grid0.start(myCfg, utilityCacheExecSvc, marshCacheExecSvc, execSvc, sysExecSvc, p2pExecSvc, mgmtExecSvc,
-                    igfsExecSvc, restExecSvc,
+                    igfsExecSvc, restExecSvc, callbackExecSvc,
                     new CA() {
                         @Override public void apply() {
                             startLatch.countDown();
@@ -2290,6 +2300,10 @@ public class IgnitionEx {
             U.shutdownNow(getClass(), marshCacheExecSvc, log);
 
             marshCacheExecSvc = null;
+
+            U.shutdownNow(getClass(), callbackExecSvc, log);
+
+            callbackExecSvc = null;
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index dc8e08c..73a9dbf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -27,6 +27,7 @@ import org.apache.ignite.cache.eviction.EvictableEntry;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -462,6 +463,7 @@ public interface GridCacheEntryEx {
      * @param subjId Subject ID initiated this update.
      * @param taskName Task name.
      * @param updateCntr Update counter.
+     * @param fut Dht atomic future.
      * @return Tuple where first value is flag showing whether operation succeeded,
      *      second value is old entry value if return value is requested, third is updated entry value,
      *      fourth is the version to enqueue for deferred delete the fifth is DR conflict context
@@ -497,7 +499,8 @@ public interface GridCacheEntryEx {
         @Nullable UUID subjId,
         String taskName,
         @Nullable CacheObject prevVal,
-        @Nullable Long updateCntr
+        @Nullable Long updateCntr,
+        @Nullable GridDhtAtomicUpdateFuture fut
     ) throws IgniteCheckedException, GridCacheEntryRemovedException;
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 435d337..45be26c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
 import org.apache.ignite.internal.processors.cache.extras.GridCacheEntryExtras;
 import org.apache.ignite.internal.processors.cache.extras.GridCacheMvccEntryExtras;
@@ -1248,6 +1249,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     tx.local(),
                     false,
                     updateCntr0,
+                    null,
                     topVer);
             }
 
@@ -1445,6 +1447,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     tx.local(),
                     false,
                     updateCntr0,
+                    null,
                     topVer);
             }
 
@@ -1819,6 +1822,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     true,
                     false,
                     updateCntr,
+                    null,
                     AffinityTopologyVersion.NONE);
             }
 
@@ -1868,7 +1872,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         @Nullable final UUID subjId,
         final String taskName,
         @Nullable final CacheObject prevVal,
-        @Nullable final Long updateCntr
+        @Nullable final Long updateCntr,
+        @Nullable GridDhtAtomicUpdateFuture fut
     ) throws IgniteCheckedException, GridCacheEntryRemovedException, GridClosureException {
         assert cctx.atomic();
 
@@ -1897,7 +1902,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         Long updateCntr0 = null;
 
         synchronized (this) {
-            boolean needVal = intercept || retval || op == GridCacheOperation.TRANSFORM || !F.isEmptyOrNulls(filter);
+            boolean internal = isInternal() || !context().userCache();
+
+            Map<UUID, CacheContinuousQueryListener> lsnrs = cctx.continuousQueries().updateListeners(internal, false);
+
+            boolean needVal = lsnrs != null || intercept || retval || op == GridCacheOperation.TRANSFORM
+                || !F.isEmptyOrNulls(filter);
 
             checkObsolete();
 
@@ -2091,6 +2101,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                                 primary,
                                 false,
                                 updateCntr0,
+                                null,
                                 topVer);
                         }
 
@@ -2499,6 +2510,21 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             if (res)
                 updateMetrics(op, metrics);
 
+            // Continuous query filter should be perform under lock.
+            if (lsnrs != null) {
+                CacheObject evtVal = updated;
+                CacheObject evtOldVal = oldVal;
+
+                if (isOffHeapValuesOnly()) {
+                    evtVal = cctx.toCacheObject(cctx.unwrapTemporary(evtVal));
+
+                    evtOldVal = cctx.toCacheObject(cctx.unwrapTemporary(evtOldVal));
+                }
+
+                cctx.continuousQueries().onEntryUpdated(lsnrs, key, evtVal, evtOldVal, internal,
+                    partition(), primary, false, updateCntr0, fut, topVer);
+            }
+
             cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE, keepBinary);
 
             if (intercept) {
@@ -3334,6 +3360,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                         true,
                         preload,
                         updateCntr,
+                        null,
                         topVer);
 
                     cctx.dataStructures().onEntryUpdated(key, false, true);

http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index cae13e8..25e4e3f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -76,7 +76,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSing
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
-import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
@@ -2004,10 +2003,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         boolean intercept = ctx.config().getInterceptor() != null;
 
-        boolean initLsnrs = false;
-        Map<UUID, CacheContinuousQueryListener> lsnrs = null;
-        boolean internal = false;
-
         // Avoid iterator creation.
         for (int i = 0; i < keys.size(); i++) {
             KeyCacheObject k = keys.get(i);
@@ -2022,14 +2017,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 if (entry == null)
                     continue;
 
-                if (!initLsnrs) {
-                    internal = entry.isInternal() || !context().userCache();
-
-                    lsnrs = ctx.continuousQueries().updateListeners(internal, false);
-
-                    initLsnrs = true;
-                }
-
                 GridCacheVersion newConflictVer = req.conflictVersion(i);
                 long newConflictTtl = req.conflictTtl(i);
                 long newConflictExpireTime = req.conflictExpireTime(i);
@@ -2058,7 +2045,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     req.invokeArguments(),
                     primary && writeThrough() && !req.skipStore(),
                     !req.skipStore(),
-                    lsnrs != null || sndPrevVal || req.returnValue(),
+                    sndPrevVal || req.returnValue(),
                     req.keepBinary(),
                     expiry,
                     true,
@@ -2076,7 +2063,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     req.subjectId(),
                     taskName,
                     null,
-                    null);
+                    null,
+                    dhtFut);
 
                 if (dhtFut == null && !F.isEmpty(filteredReaders)) {
                     dhtFut = createDhtFuture(ver, req, res, completionCb, true);
@@ -2085,8 +2073,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 }
 
                 if (dhtFut != null) {
-                    dhtFut.listeners(lsnrs);
-
                     if (updRes.sendToDht()) { // Send to backups even in case of remove-remove scenarios.
                         GridCacheVersionConflictContext<?, ?> conflictCtx = updRes.conflictResolveResult();
 
@@ -2123,19 +2109,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 "[entry=" + entry + ", filter=" + Arrays.toString(req.filter()) + ']');
                     }
                 }
-                else if (lsnrs != null && updRes.updateCounter() != 0) {
-                    ctx.continuousQueries().onEntryUpdated(
-                        lsnrs,
-                        entry.key(),
-                        updRes.newValue(),
-                        updRes.oldValue(),
-                        internal,
-                        entry.partition(),
-                        primary,
-                        false,
-                        updRes.updateCounter(),
-                        topVer);
-                }
 
                 if (hasNear) {
                     if (primary && updRes.sendToDht()) {
@@ -2309,9 +2282,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
             boolean intercept = ctx.config().getInterceptor() != null;
 
-            boolean initLsnrs = false;
-            Map<UUID, CacheContinuousQueryListener> lsnrs = null;
-
             // Avoid iterator creation.
             for (int i = 0; i < entries.size(); i++) {
                 GridDhtCacheEntry entry = entries.get(i);
@@ -2345,14 +2315,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id()));
                     }
 
-                    if (!initLsnrs) {
-                        lsnrs = ctx.continuousQueries().updateListeners(
-                            entry.isInternal() || !context().userCache(),
-                            false);
-
-                        initLsnrs = true;
-                    }
-
                     GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
                         ver,
                         node.id(),
@@ -2362,7 +2324,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         null,
                         /*write-through*/false,
                         /*read-through*/false,
-                        /*retval*/sndPrevVal || lsnrs != null,
+                        /*retval*/sndPrevVal,
                         req.keepBinary(),
                         expiry,
                         /*event*/true,
@@ -2380,7 +2342,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         req.subjectId(),
                         taskName,
                         null,
-                        null);
+                        null,
+                        dhtFut);
 
                     assert !updRes.success() || updRes.newTtl() == CU.TTL_NOT_CHANGED || expiry != null :
                         "success=" + updRes.success() + ", newTtl=" + updRes.newTtl() + ", expiry=" + expiry;
@@ -2411,12 +2374,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     }
 
                     if (dhtFut != null) {
-                        dhtFut.listeners(lsnrs);
-
                         EntryProcessor<Object, Object, Object> entryProcessor =
                             entryProcessorMap == null ? null : entryProcessorMap.get(entry.key());
 
-                        if (!batchRes.readersOnly())
+                        if (!batchRes.readersOnly()) {
                             dhtFut.addWriteEntry(entry,
                                 writeVal,
                                 entryProcessor,
@@ -2426,6 +2387,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 sndPrevVal,
                                 updRes.oldValue(),
                                 updRes.updateCounter());
+                        }
 
                         if (!F.isEmpty(filteredReaders))
                             dhtFut.addNearWriteEntries(filteredReaders,
@@ -2435,19 +2397,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 updRes.newTtl(),
                                 CU.EXPIRE_TIME_CALCULATE);
                     }
-                    else if (lsnrs != null && updRes.updateCounter() != 0) {
-                        ctx.continuousQueries().onEntryUpdated(
-                            lsnrs,
-                            entry.key(),
-                            updRes.newValue(),
-                            updRes.oldValue(),
-                            entry.isInternal() || !context().userCache(),
-                            entry.partition(),
-                            primary,
-                            false,
-                            updRes.updateCounter(),
-                            topVer);
-                    }
 
                     if (hasNear) {
                         if (primary) {
@@ -2823,10 +2772,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
 
-        boolean initLsnrs = false;
-        Map<UUID, CacheContinuousQueryListener> lsnrs = null;
-        boolean internal = false;
-
         for (int i = 0; i < req.size(); i++) {
             KeyCacheObject key = req.key(i);
 
@@ -2849,14 +2794,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         long ttl = req.ttl(i);
                         long expireTime = req.conflictExpireTime(i);
 
-                        if (!initLsnrs) {
-                            internal = entry.isInternal() || !context().userCache();
-
-                            lsnrs = ctx.continuousQueries().updateListeners(internal, false);
-
-                            initLsnrs = true;
-                        }
-
                         GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
                             ver,
                             nodeId,
@@ -2866,7 +2803,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             op == TRANSFORM ? req.invokeArguments() : null,
                             /*write-through*/false,
                             /*read-through*/false,
-                            /*retval*/lsnrs != null,
+                            /*retval*/false,
                             req.keepBinary(),
                             /*expiry policy*/null,
                             /*event*/true,
@@ -2884,25 +2821,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             req.subjectId(),
                             taskName,
                             prevVal,
-                            updateIdx);
+                            updateIdx,
+                            null);
 
                         if (updRes.removeVersion() != null)
                             ctx.onDeferredDelete(entry, updRes.removeVersion());
 
-                        if (lsnrs != null && updRes.updateCounter() != 0) {
-                            ctx.continuousQueries().onEntryUpdated(
-                                lsnrs,
-                                entry.key(),
-                                updRes.newValue(),
-                                updRes.oldValue(),
-                                internal,
-                                entry.partition(),
-                                false,
-                                false,
-                                updRes.updateCounter(),
-                                req.topologyVersion());
-                        }
-
                         entry.onUnlock();
 
                         break; // While.

http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 07f0e14..47f6cb2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -37,11 +37,11 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
-import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -97,15 +97,15 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
     /** Future keys. */
     private final Collection<KeyCacheObject> keys;
 
+    /** Continuous query closures. */
+    private Collection<CI1<Boolean>> cntQryClsrs;
+
     /** */
     private final boolean waitForExchange;
 
     /** Response count. */
     private volatile int resCnt;
 
-    /** */
-    private Map<UUID, CacheContinuousQueryListener> lsnrs;
-
     /**
      * @param cctx Cache context.
      * @param completionCb Callback to invoke when future is completed.
@@ -140,13 +140,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
         waitForExchange = !topLocked;
     }
 
-    /**
-     * @param lsnrs Continuous query listeners.
-     */
-    void listeners(@Nullable Map<UUID, CacheContinuousQueryListener> lsnrs) {
-        this.lsnrs = lsnrs;
-    }
-
     /** {@inheritDoc} */
     @Override public IgniteUuid futureId() {
         return futVer.asGridUuid();
@@ -283,27 +276,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
                     addPrevVal,
                     entry.partition(),
                     prevVal,
-                    updateCntr,
-                    lsnrs != null);
-            }
-            else if (lsnrs != null && dhtNodes.size() == 1) {
-                try {
-                    cctx.continuousQueries().onEntryUpdated(
-                        lsnrs,
-                        entry.key(),
-                        val,
-                        prevVal,
-                        entry.key().internal() || !cctx.userCache(),
-                        entry.partition(),
-                        true,
-                        false,
-                        updateCntr,
-                        updateReq.topologyVersion());
-                }
-                catch (IgniteCheckedException e) {
-                    U.warn(log, "Failed to send continuous query message. [key=" + entry.key() + ", newVal="
-                        + val + ", err=" + e + "]");
-                }
+                    updateCntr);
             }
         }
     }
@@ -368,77 +341,33 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
         }
     }
 
+    /**
+     * @param clsr Continuous query closure.
+     */
+    public void addContinuousQueryClosure(CI1<Boolean> clsr){
+        assert !isDone() : this;
+
+        if (cntQryClsrs == null)
+            cntQryClsrs = new ArrayList<>(10);
+
+        cntQryClsrs.add(clsr);
+    }
+
     /** {@inheritDoc} */
     @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) {
         if (super.onDone(res, err)) {
             cctx.mvcc().removeAtomicFuture(version());
 
-            if (err != null) {
-                if (!mappings.isEmpty() && lsnrs != null) {
-                    Collection<KeyCacheObject> hndKeys = new ArrayList<>(keys.size());
-
-                    exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) {
-                        for (int i = 0; i < req.size(); i++) {
-                            KeyCacheObject key = req.key(i);
-
-                            if (!hndKeys.contains(key)) {
-                                updateRes.addFailedKey(key, err);
+            boolean suc = err == null;
 
-                                cctx.continuousQueries().skipUpdateEvent(
-                                    lsnrs,
-                                    key,
-                                    req.partitionId(i),
-                                    req.updateCounter(i),
-                                    updateReq.topologyVersion());
-
-                                hndKeys.add(key);
-
-                                if (hndKeys.size() == keys.size())
-                                    break exit;
-                            }
-                        }
-                    }
-                }
-                else
-                    for (KeyCacheObject key : keys)
-                        updateRes.addFailedKey(key, err);
+            if (!suc) {
+                for (KeyCacheObject key : keys)
+                    updateRes.addFailedKey(key, err);
             }
-            else {
-                if (lsnrs != null) {
-                    Collection<KeyCacheObject> hndKeys = new ArrayList<>(keys.size());
-
-                    exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) {
-                        for (int i = 0; i < req.size(); i++) {
-                            KeyCacheObject key = req.key(i);
-
-                            if (!hndKeys.contains(key)) {
-                                try {
-                                    cctx.continuousQueries().onEntryUpdated(
-                                        lsnrs,
-                                        key,
-                                        req.value(i),
-                                        req.localPreviousValue(i),
-                                        key.internal() || !cctx.userCache(),
-                                        req.partitionId(i),
-                                        true,
-                                        false,
-                                        req.updateCounter(i),
-                                        updateReq.topologyVersion());
-                                }
-                                catch (IgniteCheckedException e) {
-                                    U.warn(log, "Failed to send continuous query message. [key=" + key +
-                                        ", newVal=" + req.value(i) +
-                                        ", err=" + e + "]");
-                                }
-
-                                hndKeys.add(key);
-
-                                if (hndKeys.size() == keys.size())
-                                    break exit;
-                            }
-                        }
-                    }
-                }
+
+            if (cntQryClsrs != null) {
+                for (CI1<Boolean> clsr : cntQryClsrs)
+                    clsr.apply(suc);
             }
 
             if (updateReq.writeSynchronizationMode() == FULL_SYNC)

http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 0f45a4b..126cd83 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -155,10 +155,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
     @GridDirectTransient
     private List<Integer> partIds;
 
-    /** */
-    @GridDirectTransient
-    private List<CacheObject> locPrevVals;
-
     /** Keep binary flag. */
     private boolean keepBinary;
 
@@ -242,7 +238,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
      * @param partId Partition.
      * @param prevVal Previous value.
      * @param updateCntr Update counter.
-     * @param storeLocPrevVal If {@code true} stores previous value.
      */
     public void addWriteValue(KeyCacheObject key,
         @Nullable CacheObject val,
@@ -253,19 +248,12 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
         boolean addPrevVal,
         int partId,
         @Nullable CacheObject prevVal,
-        @Nullable Long updateCntr,
-        boolean storeLocPrevVal) {
+        @Nullable Long updateCntr
+    ) {
         keys.add(key);
 
         partIds.add(partId);
 
-        if (storeLocPrevVal) {
-            if (locPrevVals == null)
-                locPrevVals = new ArrayList<>();
-
-            locPrevVals.add(prevVal);
-        }
-
         if (forceTransformBackups) {
             assert entryProcessor != null;
 
@@ -526,16 +514,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
     /**
      * @param idx Key index.
-     * @return Value.
-     */
-    @Nullable public CacheObject localPreviousValue(int idx) {
-        assert locPrevVals != null;
-
-        return locPrevVals.get(idx);
-    }
-
-    /**
-     * @param idx Key index.
      * @return Entry processor.
      */
     @Nullable public EntryProcessor<Object, Object, Object> entryProcessor(int idx) {
@@ -1064,13 +1042,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
     private void cleanup() {
         nearVals = null;
         prevVals = null;
-
-        // Do not keep values if they are not needed for continuous query notification.
-        if (locPrevVals == null) {
-            keys = null;
-            vals = null;
-            locPrevVals = null;
-        }
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 28cfc15..d099613 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -271,6 +271,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
                         subjId,
                         taskName,
                         null,
+                        null,
                         null);
 
                     if (updRes.removeVersion() != null)
@@ -372,6 +373,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
                             req.subjectId(),
                             taskName,
                             null,
+                            null,
                             null);
 
                         if (updRes.removeVersion() != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
index eab5dbd..db70e2e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
@@ -56,6 +56,13 @@ class CacheContinuousQueryEvent<K, V> extends CacheQueryEntryEvent<K, V> {
         return e;
     }
 
+    /**
+     * @return Partition ID.
+     */
+    public int partitionId() {
+        return e.partition();
+    }
+
     /** {@inheritDoc} */
     @Override public K getKey() {
         return (K)cctx.cacheObjectContext().unwrapBinaryIfNeeded(e.key(), e.isKeepBinary(), false);


Mime
View raw message