ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject [2/2] ignite git commit: IGNITE-2004 Added tests.
Date Fri, 01 Apr 2016 17:15:39 GMT
IGNITE-2004 Added tests.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/94e301d8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/94e301d8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/94e301d8

Branch: refs/heads/ignite-2004
Commit: 94e301d8cd06795f255784e4c903230559c754bc
Parents: 3f2c2e6
Author: nikolay_tikhonov <ntikhonov@gridgain.com>
Authored: Fri Apr 1 20:15:31 2016 +0300
Committer: nikolay_tikhonov <ntikhonov@gridgain.com>
Committed: Fri Apr 1 20:15:31 2016 +0300

----------------------------------------------------------------------
 .../ignite/cache/query/CacheAsyncCallback.java  |  36 --
 .../ignite/cache/query/ContinuousQuery.java     |   9 +-
 .../processors/cache/GridCacheMapEntry.java     |  17 +-
 .../cache/GridCacheUpdateAtomicResult.java      |  29 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  59 +-
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   | 123 +---
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  |  35 +-
 .../continuous/CacheContinuousQueryClosure.java |  33 ++
 .../continuous/CacheContinuousQueryEvent.java   |  17 -
 .../continuous/CacheContinuousQueryHandler.java | 464 +++++++--------
 .../CacheContinuousQueryListener.java           |  16 +-
 .../continuous/CacheContinuousQueryManager.java | 166 +-----
 .../apache/ignite/lang/IgniteAsyncCallback.java |  39 ++
 ...eContinuousQueryAsyncFilterListenerTest.java |   8 +-
 ...ryFactoryAsyncFilterRandomOperationTest.java |   5 +-
 ...usQueryFactoryFilterRandomOperationTest.java |   7 +-
 .../CacheContinuousQueryOrderingEventTest.java  | 558 +++++++++++++++++++
 ...ridCacheContinuousQueryAbstractSelfTest.java |   8 +
 .../IgniteCacheQuerySelfTestSuite4.java         |   2 +-
 19 files changed, 997 insertions(+), 634 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/94e301d8/modules/core/src/main/java/org/apache/ignite/cache/query/CacheAsyncCallback.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/CacheAsyncCallback.java b/modules/core/src/main/java/org/apache/ignite/cache/query/CacheAsyncCallback.java
deleted file mode 100644
index a67b369..0000000
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/CacheAsyncCallback.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.query;
-
-import javax.cache.configuration.Factory;
-import javax.cache.event.CacheEntryEventFilter;
-import javax.cache.event.CacheEntryListener;
-import org.apache.ignite.configuration.IgniteConfiguration;
-
-/**
- * Marker interface. If {@link CacheEntryEventFilter filter} or {@link CacheEntryListener}
- * implementations extend this interface then they will be executing on a separate thread pool. It allows
- * to use cache API in a callbacks.
- * <p>
- * Thread pool which will be used for it can be configured by
- * {@link IgniteConfiguration#setContinuousQueryPoolSize(int)}
- *
- * @see ContinuousQuery#setRemoteFilterFactory(Factory)
- */
-public interface CacheAsyncCallback {
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/94e301d8/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
index 1b6c16e..cb5b05e 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
@@ -23,6 +23,7 @@ import javax.cache.event.CacheEntryEventFilter;
 import javax.cache.event.CacheEntryUpdatedListener;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.lang.IgniteAsyncCallback;
 
 /**
  * API for configuring continuous cache queries.
@@ -174,8 +175,8 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
      * synchronization or transactional cache operations), should be executed asynchronously without
      * blocking the thread that called the callback. Otherwise, you can get deadlocks.
      * * <p>
-     * If listener implements {@link CacheAsyncCallback} marker interface then cache operations are allowed.
-     * see {@link CacheAsyncCallback}.
+     * If listener implements {@link IgniteAsyncCallback} marker interface then cache operations are allowed.
+     * see {@link IgniteAsyncCallback}.
      *
      * @param locLsnr Local callback.
      * @return {@code this} for chaining.
@@ -231,8 +232,8 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
      * (e.g., synchronization or transactional cache operations), should be executed asynchronously
      * without blocking the thread that called the filter. Otherwise, you can get deadlocks.
      * <p>
-     * If filter implements {@link CacheAsyncCallback} marker interface then cache operations are allowed.
-     * see {@link CacheAsyncCallback}.
+     * If filter implements {@link IgniteAsyncCallback} marker interface then cache operations are allowed.
+     * see {@link IgniteAsyncCallback}.
      *
      * @param rmtFilterFactory Key-value filter factory.
      * @return {@code this} for chaining.

http://git-wip-us.apache.org/repos/asf/ignite/blob/94e301d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 2d58b15..2b81484 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -20,8 +20,10 @@ package org.apache.ignite.internal.processors.cache;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.cache.Cache;
 import javax.cache.expiry.ExpiryPolicy;
@@ -46,6 +48,8 @@ import org.apache.ignite.internal.processors.cache.extras.GridCacheMvccEntryExtr
 import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
 import org.apache.ignite.internal.processors.cache.extras.GridCacheTtlEntryExtras;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryClosure;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
@@ -1237,6 +1241,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     internal,
                     partition(),
                     tx.local(),
+                    true,
                     false,
                     updateCntr0,
                     topVer);
@@ -1434,6 +1439,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     internal,
                     partition(),
                     tx.local(),
+                    true,
                     false,
                     updateCntr0,
                     topVer);
@@ -1808,6 +1814,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     internal,
                     partition(),
                     true,
+                    true,
                     false,
                     updateCntr,
                     AffinityTopologyVersion.NONE);
@@ -1887,7 +1894,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
         Long updateCntr0 = null;
 
-        Map<CacheContinuousQueryListener, IgniteInternalFuture<Boolean>> filterRes = null;
+        List<CacheContinuousQueryClosure> clsrs = null;
 
         synchronized (this) {
             boolean needVal = intercept || retval || op == GridCacheOperation.TRANSFORM || !F.isEmptyOrNulls(filter);
@@ -2082,6 +2089,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                                 isInternal() || !context().userCache(),
                                 partition(),
                                 primary,
+                                true,
                                 false,
                                 updateCntr0,
                                 topVer);
@@ -2512,8 +2520,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     evtOldVal = cctx.toCacheObject(cctx.unwrapTemporary(updated0));
                 }
 
-                filterRes = cctx.continuousQueries().filterEntry(lsnrs, key, evtVal, evtOldVal, partition(), false,
-                    updateCntr0, topVer);
+                clsrs = cctx.continuousQueries().onEntryUpdated(lsnrs, key, evtVal, evtOldVal, internal,
+                    partition(), primary, false, false, updateCntr0, topVer);
             }
 
             cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE, keepBinary);
@@ -2542,7 +2550,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             conflictCtx,
             true,
             updateCntr0 == null ? 0 : updateCntr0,
-            filterRes);
+            clsrs);
     }
 
     /**
@@ -3316,6 +3324,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                         this.isInternal() || !this.context().userCache(),
                         this.partition(),
                         true,
+                        true,
                         preload,
                         updateCntr,
                         topVer);

http://git-wip-us.apache.org/repos/asf/ignite/blob/94e301d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
index 11e7949..cbd9707 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
@@ -17,10 +17,9 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import java.util.Map;
+import java.util.List;
 import javax.cache.processor.EntryProcessor;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryClosure;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -63,12 +62,12 @@ public class GridCacheUpdateAtomicResult {
     /** */
     private final long updateCntr;
 
-    /** */
-    Map<CacheContinuousQueryListener, IgniteInternalFuture<Boolean>> conQryFltrs;
-
     /** Value computed by entry processor. */
     private IgniteBiTuple<Object, Exception> res;
 
+    /** Continuous query closures. */
+    private List<CacheContinuousQueryClosure> cntQryClsrs;
+
     /**
      * Constructor.
      *
@@ -82,7 +81,6 @@ public class GridCacheUpdateAtomicResult {
      * @param conflictRes DR resolution result.
      * @param sndToDht Whether update should be propagated to DHT node.
      * @param updateCntr Partition update counter.
-     * @param conQryFltrs Continuous query
      */
     public GridCacheUpdateAtomicResult(boolean success,
         @Nullable CacheObject oldVal,
@@ -94,7 +92,7 @@ public class GridCacheUpdateAtomicResult {
         @Nullable GridCacheVersionConflictContext<?, ?> conflictRes,
         boolean sndToDht,
         long updateCntr,
-        Map<CacheContinuousQueryListener, IgniteInternalFuture<Boolean>> conQryFltrs
+        List<CacheContinuousQueryClosure> cntQryClsrs
     ) {
         this.success = success;
         this.oldVal = oldVal;
@@ -106,7 +104,7 @@ public class GridCacheUpdateAtomicResult {
         this.conflictRes = conflictRes;
         this.sndToDht = sndToDht;
         this.updateCntr = updateCntr;
-        this.conQryFltrs = conQryFltrs;
+        this.cntQryClsrs = cntQryClsrs;
     }
 
     /**
@@ -181,10 +179,17 @@ public class GridCacheUpdateAtomicResult {
     }
 
     /**
-     * @return Continuous query filter results.
+     * @param clsrs Closures.
+     */
+    private void continuousQueryClosures(List<CacheContinuousQueryClosure> clsrs) {
+        this.cntQryClsrs = clsrs;
+    }
+
+    /**
+     * @return Continuous query closures.
      */
-    public Map<CacheContinuousQueryListener, IgniteInternalFuture<Boolean>> getFilterResults() {
-        return conQryFltrs;
+    public List<CacheContinuousQueryClosure> continuousQueryClosures() {
+        return cntQryClsrs;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/94e301d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 9e6ba0a..16a0a47 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -76,6 +76,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSing
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryClosure;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -2077,8 +2078,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 }
 
                 if (dhtFut != null) {
-                    dhtFut.listeners(lsnrs);
-
                     if (updRes.sendToDht()) { // Send to backups even in case of remove-remove scenarios.
                         GridCacheVersionConflictContext<?, ?> conflictCtx = updRes.conflictResolveResult();
 
@@ -2099,7 +2098,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 sndPrevVal,
                                 updRes.oldValue(),
                                 updRes.updateCounter(),
-                                updRes.getFilterResults());
+                                updRes.continuousQueryClosures());
                         }
 
                         if (!F.isEmpty(filteredReaders))
@@ -2116,19 +2115,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 "[entry=" + entry + ", filter=" + Arrays.toString(req.filter()) + ']');
                     }
                 }
-                else if (lsnrs != null && updRes.updateCounter() != 0) {
-                    ctx.continuousQueries().onEntryUpdated(
-                        lsnrs,
-                        entry.key(),
-                        updRes.newValue(),
-                        updRes.oldValue(),
-                        internal,
-                        entry.partition(),
-                        primary,
-                        false,
-                        updRes.updateCounter(),
-                        topVer,
-                        updRes.getFilterResults());
+                else if (lsnrs != null && updRes.continuousQueryClosures() != null) {
+                    for (CacheContinuousQueryClosure clsr : updRes.continuousQueryClosures())
+                        clsr.onEntryUpdate();
                 }
 
                 if (hasNear) {
@@ -2405,12 +2394,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     }
 
                     if (dhtFut != null) {
-                        dhtFut.listeners(lsnrs);
-
                         EntryProcessor<Object, Object, Object> entryProcessor =
                             entryProcessorMap == null ? null : entryProcessorMap.get(entry.key());
 
-                        if (!batchRes.readersOnly())
+                        if (!batchRes.readersOnly()) {
                             dhtFut.addWriteEntry(entry,
                                 writeVal,
                                 entryProcessor,
@@ -2420,7 +2407,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 sndPrevVal,
                                 updRes.oldValue(),
                                 updRes.updateCounter(),
-                                updRes.getFilterResults());
+                                updRes.continuousQueryClosures());
+                        }
 
                         if (!F.isEmpty(filteredReaders))
                             dhtFut.addNearWriteEntries(filteredReaders,
@@ -2430,18 +2418,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 updRes.newTtl(),
                                 CU.EXPIRE_TIME_CALCULATE);
                     }
-                    else if (lsnrs != null && updRes.updateCounter() != 0) {
-                        ctx.continuousQueries().onEntryUpdated(
-                            lsnrs,
-                            entry.key(),
-                            updRes.newValue(),
-                            updRes.oldValue(),
-                            entry.isInternal() || !context().userCache(),
-                            entry.partition(),
-                            primary,
-                            false,
-                            updRes.updateCounter(),
-                            topVer);
+                    else if (lsnrs != null && updRes.continuousQueryClosures() != null) {
+                        for (CacheContinuousQueryClosure clsr : updRes.continuousQueryClosures())
+                            clsr.onEntryUpdate();
                     }
 
                     if (hasNear) {
@@ -2884,19 +2863,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         if (updRes.removeVersion() != null)
                             ctx.onDeferredDelete(entry, updRes.removeVersion());
 
-                        if (lsnrs != null && updRes.updateCounter() != 0) {
-                            ctx.continuousQueries().onEntryUpdated(
-                                lsnrs,
-                                entry.key(),
-                                updRes.newValue(),
-                                updRes.oldValue(),
-                                internal,
-                                entry.partition(),
-                                false,
-                                false,
-                                updRes.updateCounter(),
-                                req.topologyVersion(),
-                                updRes.getFilterResults());
+                        if (lsnrs != null && updRes.continuousQueryClosures() != null) {
+                            for (CacheContinuousQueryClosure clsr : updRes.continuousQueryClosures())
+                                clsr.onEntryUpdate();
                         }
 
                         entry.onUnlock();

http://git-wip-us.apache.org/repos/asf/ignite/blob/94e301d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index e3dfa4d..32d10c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
@@ -37,7 +38,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
-import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryClosure;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -103,8 +104,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
     /** Response count. */
     private volatile int resCnt;
 
-    /** */
-    private Map<UUID, CacheContinuousQueryListener> lsnrs;
+    /** Continuous query closures. */
+    private List<CacheContinuousQueryClosure> contQryClsrs;
 
     /**
      * @param cctx Cache context.
@@ -138,13 +139,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
         waitForExchange = !(updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest()));
     }
 
-    /**
-     * @param lsnrs Continuous query listeners.
-     */
-    void listeners(@Nullable Map<UUID, CacheContinuousQueryListener> lsnrs) {
-        this.lsnrs = lsnrs;
-    }
-
     /** {@inheritDoc} */
     @Override public IgniteUuid futureId() {
         return futVer.asGridUuid();
@@ -227,6 +221,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
      * @param addPrevVal If {@code true} sends previous value to backups.
      * @param prevVal Previous value.
      * @param updateCntr Partition update counter.
+     * @param clsrs
      */
     public void addWriteEntry(GridDhtCacheEntry entry,
         @Nullable CacheObject val,
@@ -236,8 +231,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
         @Nullable GridCacheVersion conflictVer,
         boolean addPrevVal,
         @Nullable CacheObject prevVal,
-        long updateCntr,
-        @Nullable Map<CacheContinuousQueryListener, IgniteInternalFuture<Boolean>> filterRes) {
+        long updateCntr, List<CacheContinuousQueryClosure> clsrs) {
         AffinityTopologyVersion topVer = updateReq.topologyVersion();
 
         Collection<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(), topVer);
@@ -249,6 +243,13 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
 
         keys.add(entry.key());
 
+        if (clsrs != null) {
+            if (contQryClsrs == null)
+                contQryClsrs = new ArrayList<>(keys.size());
+
+            contQryClsrs.addAll(clsrs);
+        }
+
         for (ClusterNode node : dhtNodes) {
             UUID nodeId = node.id();
 
@@ -282,30 +283,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
                     addPrevVal,
                     entry.partition(),
                     prevVal,
-                    updateCntr,
-                    filterRes,
-                    lsnrs != null);
-            }
-            else if (lsnrs != null && dhtNodes.size() == 1) {
-                try {
-                    cctx.continuousQueries().onEntryUpdated(
-                        lsnrs,
-                        entry.key(),
-                        val,
-                        prevVal,
-                        entry.key().internal() || !cctx.userCache(),
-                        entry.partition(),
-                        true,
-                        false,
-                        updateCntr,
-                        updateReq.topologyVersion(),
-                        filterRes
-                    );
-                }
-                catch (IgniteCheckedException e) {
-                    U.warn(log, "Failed to send continuous query message. [key=" + entry.key() + ", newVal="
-                        + val + ", err=" + e + "]");
-                }
+                    updateCntr);
             }
         }
     }
@@ -376,72 +354,17 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
             cctx.mvcc().removeAtomicFuture(version());
 
             if (err != null) {
-                if (!mappings.isEmpty() && lsnrs != null) {
-                    Collection<KeyCacheObject> hndKeys = new ArrayList<>(keys.size());
-
-                    exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) {
-                        for (int i = 0; i < req.size(); i++) {
-                            KeyCacheObject key = req.key(i);
-
-                            if (!hndKeys.contains(key)) {
-                                updateRes.addFailedKey(key, err);
-
-                                cctx.continuousQueries().skipUpdateEvent(
-                                    lsnrs,
-                                    key,
-                                    req.partitionId(i),
-                                    req.updateCounter(i),
-                                    updateReq.topologyVersion());
-
-                                hndKeys.add(key);
-
-                                if (hndKeys.size() == keys.size())
-                                    break exit;
-                            }
-                        }
-                    }
-                }
-                else
-                    for (KeyCacheObject key : keys)
-                        updateRes.addFailedKey(key, err);
+                for (KeyCacheObject key : keys)
+                    updateRes.addFailedKey(key, err);
+
+                if (contQryClsrs != null)
+                    for (CacheContinuousQueryClosure clsr : contQryClsrs)
+                        clsr.skipEvent();
             }
             else {
-                if (lsnrs != null) {
-                    Collection<KeyCacheObject> hndKeys = new ArrayList<>(keys.size());
-
-                    exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) {
-                        for (int i = 0; i < req.size(); i++) {
-                            KeyCacheObject key = req.key(i);
-
-                            if (!hndKeys.contains(key)) {
-                                try {
-                                    cctx.continuousQueries().onEntryUpdated(
-                                        lsnrs,
-                                        key,
-                                        req.value(i),
-                                        req.localPreviousValue(i),
-                                        key.internal() || !cctx.userCache(),
-                                        req.partitionId(i),
-                                        true,
-                                        false,
-                                        req.updateCounter(i),
-                                        updateReq.topologyVersion(),
-                                        req.filterResult(i));
-                                }
-                                catch (IgniteCheckedException e) {
-                                    U.warn(log, "Failed to send continuous query message. [key=" + key +
-                                        ", newVal=" + req.value(i) +
-                                        ", err=" + e + "]");
-                                }
-
-                                hndKeys.add(key);
-
-                                if (hndKeys.size() == keys.size())
-                                    break exit;
-                            }
-                        }
-                    }
-                }
+                if (contQryClsrs != null)
+                    for (CacheContinuousQueryClosure clsr : contQryClsrs)
+                        clsr.onEntryUpdate();
             }
 
             if (updateReq.writeSynchronizationMode() == FULL_SYNC)

http://git-wip-us.apache.org/repos/asf/ignite/blob/94e301d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index d0cfebd..9d65fa9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -162,10 +162,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
     @GridDirectTransient
     private List<CacheObject> locPrevVals;
 
-    /**  */
-    @GridDirectTransient
-    private List<Map<CacheContinuousQueryListener, IgniteInternalFuture<Boolean>>> filterRes;
-
     /** Keep binary flag. */
     private boolean keepBinary;
 
@@ -249,8 +245,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
      * @param partId Partition.
      * @param prevVal Previous value.
      * @param updateCntr Update counter.
-     * @param filterRes Filter results.
-     * @param storeLocPrevVal If {@code true} stores previous value.
      */
     public void addWriteValue(KeyCacheObject key,
         @Nullable CacheObject val,
@@ -261,20 +255,11 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
         boolean addPrevVal,
         int partId,
         @Nullable CacheObject prevVal,
-        @Nullable Long updateCntr,
-        @Nullable Map<CacheContinuousQueryListener, IgniteInternalFuture<Boolean>> filterRes,
-        boolean storeLocPrevVal) {
+        @Nullable Long updateCntr) {
         keys.add(key);
 
         partIds.add(partId);
 
-        if (storeLocPrevVal) {
-            if (locPrevVals == null)
-                locPrevVals = new ArrayList<>();
-
-            locPrevVals.add(prevVal);
-        }
-
         if (forceTransformBackups) {
             assert entryProcessor != null;
 
@@ -297,13 +282,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
             updateCntrs.add(updateCntr);
         }
 
-        if (filterRes != null) {
-            if (this.filterRes == null)
-                this.filterRes = new ArrayList<>();
-
-            this.filterRes.add(filterRes);
-        }
-
         // In case there is no conflict, do not create the list.
         if (conflictVer != null) {
             if (conflictVers == null) {
@@ -504,17 +482,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
     }
 
     /**
-     * @param idx Index.
-     * @return Filter result future.
-     */
-    public Map<CacheContinuousQueryListener, IgniteInternalFuture<Boolean>> filterResult(int idx) {
-        if (filterRes != null && idx < filterRes.size())
-            return filterRes.get(idx);
-
-        return null;
-    }
-
-    /**
      * @param idx Near key index.
      * @return Key.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/94e301d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryClosure.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryClosure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryClosure.java
new file mode 100644
index 0000000..f000b93
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryClosure.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+/**
+ *
+ */
+public interface CacheContinuousQueryClosure extends Runnable {
+    /**
+     *
+     */
+    public void onEntryUpdate();
+
+    /**
+     *
+     */
+    public void skipEvent();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/94e301d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
index 6e58bb5..2bfd53d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
@@ -34,9 +34,6 @@ class CacheContinuousQueryEvent<K, V> extends CacheQueryEntryEvent<K, V> {
     /** */
     private final GridCacheContext cctx;
 
-    /** */
-    private IgniteInternalFuture<Boolean> filterFut;
-
     /** Entry. */
     @GridToStringExclude
     private final CacheContinuousQueryEntry e;
@@ -54,20 +51,6 @@ class CacheContinuousQueryEvent<K, V> extends CacheQueryEntryEvent<K, V> {
     }
 
     /**
-     * @return Filter future.
-     */
-    public IgniteInternalFuture<Boolean> getFilterFuture() {
-        return filterFut;
-    }
-
-    /**
-     * @param filterFut Filter future.
-     */
-    public void setFilterFut(IgniteInternalFuture<Boolean> filterFut) {
-        this.filterFut = filterFut;
-    }
-
-    /**
      * @return Entry.
      */
     CacheContinuousQueryEntry entry() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/94e301d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 5cd7b2b..b3d5028 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -34,22 +34,27 @@ import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryEventFilter;
 import javax.cache.event.CacheEntryUpdatedListener;
 import javax.cache.event.EventType;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
-import org.apache.ignite.cache.query.CacheAsyncCallback;
+import org.apache.ignite.lang.IgniteAsyncCallback;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.CacheQueryExecutedEvent;
 import org.apache.ignite.events.CacheQueryReadEvent;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteDeploymentCheckedException;
-import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
@@ -68,11 +73,10 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
 import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter;
 import org.apache.ignite.internal.util.GridConcurrentSkipListSet;
 import org.apache.ignite.internal.util.GridLongList;
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -165,10 +169,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     private transient boolean ignoreClsNotFound;
 
     /** */
-    private transient boolean asyncLsnr;
-
-    /** */
-    private transient boolean asyncFilter;
+    private transient boolean asyncCallback;
 
     /**
      * Required by {@link Externalizable}.
@@ -299,13 +300,14 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
         final CacheEntryEventFilter filter = getEventFilter();
 
-        asyncLsnr = locLsnr instanceof CacheAsyncCallback;
+        asyncCallback = U.hasAnnotation(locLsnr, IgniteAsyncCallback.class);
 
         if (filter != null) {
             ctx.resource().injectGeneric(filter);
 
-            asyncFilter = filter instanceof CacheAsyncCallback
-                || (filter instanceof JCacheQueryRemoteFilter && ((JCacheQueryRemoteFilter)filter).async());
+            if (!asyncCallback)
+                asyncCallback = U.hasAnnotation(filter, IgniteAsyncCallback.class)
+                    || (filter instanceof JCacheQueryRemoteFilter && ((JCacheQueryRemoteFilter)filter).async());
         }
 
         entryBufs = new ConcurrentHashMap<>();
@@ -320,9 +322,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
         assert !skipPrimaryCheck || loc;
 
-        final boolean asyncLsnr0 = asyncLsnr;
-        final boolean asyncFilter0 = asyncFilter;
-
         CacheContinuousQueryListener<K, V> lsnr = new CacheContinuousQueryListener<K, V>() {
             @Override public void onExecution() {
                 if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
@@ -348,59 +347,25 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 return keepBinary;
             }
 
-            @Override public IgniteInternalFuture<Boolean> filter(final CacheContinuousQueryEvent<K, V> evt) {
-                final GridCacheContext<K, V> cctx = cacheContext(ctx);
-
-                try {
-                    if (filter == null)
-                        return new GridFinishedFuture<>(true);
-
-                    if (asyncFilter0) {
-                        final GridFutureAdapter<Boolean> f = new GridFutureAdapter<>();
-
-                        ctx.continuousQueryPool().execute(new FilterClosure(evt, filter, f,
-                            cctx.logger(CacheContinuousQueryHandler.class)), evt.partitionId());
-
-                        return f;
-                    }
-                    else
-                        return new GridFinishedFuture<>(filter.evaluate(evt));
-                }
-                catch (Exception e) {
-                    U.error(cctx.logger(CacheContinuousQueryHandler.class), "CacheEntryEventFilter failed: " + e);
-
-                    return new GridFinishedFuture<>(true);
-                }
-            }
-
-            @Override public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary,
-                boolean recordIgniteEvt) {
+            @Override public CacheContinuousQueryClosure onEntryUpdated(CacheContinuousQueryEvent<K, V> evt,
+                boolean primary,
+                boolean recordIgniteEvt,
+                boolean fireEvent) {
                 if (ignoreExpired && evt.getEventType() == EventType.EXPIRED)
-                    return;
+                    return null;
 
                 final GridCacheContext<K, V> cctx = cacheContext(ctx);
 
                 // Check that cache stopped.
                 if (cctx == null)
-                    return;
+                    return null;
 
                 final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name());
 
                 // skipPrimaryCheck is set only when listen locally for replicated cache events.
                 assert !skipPrimaryCheck || (cctx.isReplicated() && ctx.localNodeId().equals(nodeId));
 
-                IgniteInternalFuture<Boolean> notify;
-
-                if (evt.getFilterFuture() == null) {
-                    if (!evt.entry().isFiltered() && filter != null)
-                        notify = filter(evt);
-                    else
-                        notify = new GridFinishedFuture<>(!evt.entry().isFiltered());
-                }
-                else
-                    notify = evt.getFilterFuture();
-
-                final ContinuousQueryClosure clsr = new ContinuousQueryClosure(taskName(),
+                final ContinuousQueryClosureImpl clsr = new ContinuousQueryClosureImpl(taskName(),
                     recordIgniteEvt,
                     routineId,
                     nodeId,
@@ -409,14 +374,20 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                     primary,
                     cctx,
                     filter,
-                    notify,
                     evt,
+                    fireEvent,
                     cache);
 
-                if (asyncFilter0 || asyncLsnr0)
-                    ctx.continuousQueryPool().execute(clsr, evt.partitionId());
+                if (!asyncCallback) {
+                    clsr.filter();
+
+                    if (fireEvent)
+                        clsr.onEntryUpdate();
+                }
                 else
-                    clsr.run();
+                    ctx.continuousQueryPool().execute(clsr, evt.partitionId());
+
+                return clsr;
             }
 
             @Override public void onUnregister() {
@@ -462,15 +433,15 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 sendBackupAcknowledge(ackBuf.acknowledgeOnTimeout(), routineId, ctx);
             }
 
-            @Override public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer,
-                boolean primary) {
+            @Override public CacheContinuousQueryClosure skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt,
+                AffinityTopologyVersion topVer, boolean primary, boolean fireEvnt) {
                 assert evt != null;
 
                 CacheContinuousQueryEntry e = evt.entry();
 
                 e.markFiltered();
 
-                onEntryUpdated(evt, primary, false);
+                return onEntryUpdated(evt, primary, false, fireEvnt);
             }
 
             @Override public void onPartitionEvicted(int part) {
@@ -579,76 +550,90 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
         Collection<CacheContinuousQueryEntry> entries0 = new ArrayList<>();
 
-        for (CacheContinuousQueryEntry e : entries) {
-            GridCacheDeploymentManager depMgr = cctx.deploy();
+        List<PartitionRecovery> recoveries = new ArrayList<>();
+
+        try {
+            for (CacheContinuousQueryEntry e : entries) {
+                GridCacheDeploymentManager depMgr = cctx.deploy();
 
-            ClassLoader ldr = depMgr.globalLoader();
+                ClassLoader ldr = depMgr.globalLoader();
 
-            if (ctx.config().isPeerClassLoadingEnabled()) {
-                GridDeploymentInfo depInfo = e.deployInfo();
+                if (ctx.config().isPeerClassLoadingEnabled()) {
+                    GridDeploymentInfo depInfo = e.deployInfo();
 
-                if (depInfo != null) {
-                    depMgr.p2pContext(nodeId, depInfo.classLoaderId(), depInfo.userVersion(), depInfo.deployMode(),
-                        depInfo.participants(), depInfo.localDeploymentOwner());
+                    if (depInfo != null) {
+                        depMgr.p2pContext(nodeId, depInfo.classLoaderId(), depInfo.userVersion(), depInfo.deployMode(),
+                            depInfo.participants(), depInfo.localDeploymentOwner());
+                    }
                 }
-            }
 
-            try {
-                e.unmarshal(cctx, ldr);
+                try {
+                    e.unmarshal(cctx, ldr);
 
-                entries0.addAll(handleEvent(ctx, e));
-            }
-            catch (IgniteCheckedException ex) {
-                if (ignoreClsNotFound)
-                    assert internal;
-                else
-                    U.error(ctx.log(getClass()), "Failed to unmarshal entry.", ex);
+                    T2<Collection<CacheContinuousQueryEntry>, PartitionRecovery> evts = handleEvent(ctx, e);
+
+                    if (evts.get2() != null)
+                        recoveries.add(evts.get2());
+
+                    entries0.addAll(evts.get1());
+                }
+                catch (IgniteCheckedException ex) {
+                    if (ignoreClsNotFound)
+                        assert internal;
+                    else
+                        U.error(ctx.log(getClass()), "Failed to unmarshal entry.", ex);
+                }
             }
-        }
 
-        final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name());
+            final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name());
 
-        if (!entries0.isEmpty()) {
-            if (asyncLsnr) {
-                Iterable<CacheContinuousQueryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0,
-                    new C1<CacheContinuousQueryEntry, CacheContinuousQueryEvent<? extends K, ? extends V>>() {
-                        @Override public CacheContinuousQueryEvent<? extends K, ? extends V> apply(
-                            CacheContinuousQueryEntry e) {
-                            return new CacheContinuousQueryEvent<>(cache, cctx, e);
-                        }
-                    },
-                    new IgnitePredicate<CacheContinuousQueryEntry>() {
-                        @Override public boolean apply(CacheContinuousQueryEntry entry) {
-                            return !entry.isFiltered();
+            if (!entries0.isEmpty()) {
+                if (asyncCallback) {
+                    Iterable<CacheContinuousQueryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0,
+                        new C1<CacheContinuousQueryEntry, CacheContinuousQueryEvent<? extends K, ? extends V>>() {
+                            @Override public CacheContinuousQueryEvent<? extends K, ? extends V> apply(
+                                CacheContinuousQueryEntry e) {
+                                return new CacheContinuousQueryEvent<>(cache, cctx, e);
+                            }
+                        },
+                        new IgnitePredicate<CacheContinuousQueryEntry>() {
+                            @Override public boolean apply(CacheContinuousQueryEntry entry) {
+                                return !entry.isFiltered();
+                            }
                         }
-                    }
-                );
+                    );
 
-                for (final CacheContinuousQueryEvent<? extends K, ? extends V> e : evts) {
-                    ctx.continuousQueryPool().execute(new Runnable() {
-                        @Override public void run() {
-                            locLsnr.onUpdated(Collections.<CacheEntryEvent<? extends K, ? extends V>>singleton(e));
-                        }
-                    }, e.partitionId());
+                    for (final CacheContinuousQueryEvent<? extends K, ? extends V> e : evts) {
+                        ctx.continuousQueryPool().execute(new Runnable() {
+                            @Override public void run() {
+                                locLsnr.onUpdated(Collections.<CacheEntryEvent<? extends K, ? extends V>>singleton(e));
+                            }
+                        }, e.partitionId());
+                    }
                 }
-            }
-            else {
-                Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0,
-                    new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() {
-                        @Override public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry e) {
-                            return new CacheContinuousQueryEvent<>(cache, cctx, e);
-                        }
-                    },
-                    new IgnitePredicate<CacheContinuousQueryEntry>() {
-                        @Override public boolean apply(CacheContinuousQueryEntry entry) {
-                            return !entry.isFiltered();
+                else {
+                    Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0,
+                        new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() {
+                            @Override
+                            public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry e) {
+                                return new CacheContinuousQueryEvent<>(cache, cctx, e);
+                            }
+                        },
+                        new IgnitePredicate<CacheContinuousQueryEntry>() {
+                            @Override public boolean apply(CacheContinuousQueryEntry entry) {
+                                return !entry.isFiltered();
+                            }
                         }
-                    }
-                );
+                    );
 
-                locLsnr.onUpdated(evts);
+                    locLsnr.onUpdated(evts);
+                }
             }
         }
+        finally {
+            for (PartitionRecovery rec : recoveries)
+                rec.unlock();
+        }
     }
 
     /**
@@ -656,24 +641,24 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
      * @param e entry.
      * @return Entry collection.
      */
-    private Collection<CacheContinuousQueryEntry> handleEvent(GridKernalContext ctx,
+    private T2<Collection<CacheContinuousQueryEntry>, PartitionRecovery> handleEvent(GridKernalContext ctx,
         CacheContinuousQueryEntry e) {
         assert e != null;
 
         if (internal) {
             if (e.isFiltered())
-                return Collections.emptyList();
+                return new T2(Collections.emptyList(), null);
             else
-                return F.asList(e);
+                return new T2(F.asList(e), null);
         }
 
         // Initial query entry or evicted entry. These events should be fired immediately.
         if (e.updateCounter() == -1L)
-            return F.asList(e);
+            return new T2(F.asList(e), null);
 
         PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition());
 
-        return rec.collectEntries(e);
+        return new T2<>(rec.collectEntries(e), rec);
     }
 
     /**
@@ -777,6 +762,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         /** */
         private final Map<Long, CacheContinuousQueryEntry> pendingEvts = new TreeMap<>();
 
+        /** */
+        private Lock lock = new ReentrantLock();
+
         /**
          * @param log Logger.
          * @param topVer Topology version.
@@ -801,17 +789,19 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
          * @return Collection entries which will be fired.
          */
         public Collection<CacheContinuousQueryEntry> collectEntries(CacheContinuousQueryEntry entry) {
-            assert entry != null;
+            lock.lock();
 
-            if (entry.topologyVersion() == null) { // Possible if entry is sent from old node.
-                assert entry.updateCounter() == 0L : entry;
+            try {
+                assert entry != null;
 
-                return F.asList(entry);
-            }
+                if (entry.topologyVersion() == null) { // Possible if entry is sent from old node.
+                    assert entry.updateCounter() == 0L : entry;
 
-            List<CacheContinuousQueryEntry> entries;
+                    return F.asList(entry);
+                }
+
+                List<CacheContinuousQueryEntry> entries;
 
-            synchronized (pendingEvts) {
                 // Received first event.
                 if (curTop == AffinityTopologyVersion.NONE) {
                     lastFiredEvt = entry.updateCounter();
@@ -899,9 +889,21 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                             break;
                     }
                 }
+
+                return entries;
+            }
+            catch (Exception e) {
+                lock.unlock();
+
+                throw new IgniteException("Failed to collect entries.");
             }
+        }
 
-            return entries;
+        /**
+         * Unlock.
+         */
+        public void unlock() {
+            lock.unlock();
         }
     }
 
@@ -1255,56 +1257,15 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     /**
      *
      */
-    private static class FilterClosure implements Runnable {
+    private class ContinuousQueryClosureImpl implements CacheContinuousQueryClosure {
         /** */
-        private final CacheContinuousQueryEvent evt;
-
-        /** */
-        private final GridFutureAdapter<Boolean> f;
+        private final IgniteCache cache;
 
         /** */
         private final IgniteLogger log;
 
         /** */
-        private final CacheEntryEventFilter filter;
-
-        /**
-         * @param evt Continuous query event.
-         * @param filter Filter.
-         * @param f Future.
-         * @param log Logger.
-         */
-        public FilterClosure(CacheContinuousQueryEvent evt,
-            CacheEntryEventFilter filter,
-            GridFutureAdapter<Boolean> f,
-            IgniteLogger log) {
-            this.evt = evt;
-            this.f = f;
-            this.log = log;
-            this.filter = filter;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void run() {
-            boolean res = true;
-
-            try {
-                res = filter.evaluate(evt);
-            }
-            catch (Exception e) {
-                U.error(log, "CacheEntryEventFilter failed: " + e);
-            }
-
-            f.onDone(res);
-        }
-    }
-
-    /**
-     *
-     */
-    private class ContinuousQueryClosure implements Runnable {
-        /** */
-        private final IgniteCache cache;
+        private final boolean fireEvent;
 
         /** */
         private CacheContinuousQueryEvent<K, V> evt;
@@ -1313,9 +1274,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         private CacheEntryEventFilter filter;
 
         /** */
-        private IgniteInternalFuture<Boolean> notifyFut;
-
-        /** */
         private final GridCacheContext<K, V> cctx;
 
         /** */
@@ -1339,6 +1297,15 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         /** */
         private final String taskName;
 
+        /** */
+        private boolean notify;
+
+        /** */
+        private boolean backup;
+
+        /** */
+        private final CountDownLatch latch = new CountDownLatch(1);
+
         /**
          * @param taskName Task name.
          * @param recordIgniteEvt Fired event.
@@ -1349,11 +1316,11 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
          * @param primary Primary flag.
          * @param cctx Cache context.
          * @param filter Filter.
-         * @param notifyFut Notify future.
          * @param evt Event.
+         * @param fireEvent Immediately fire event.
          * @param cache Cache.
          */
-        public ContinuousQueryClosure(String taskName,
+        ContinuousQueryClosureImpl(String taskName,
             boolean recordIgniteEvt,
             UUID routineId,
             UUID nodeId,
@@ -1362,9 +1329,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
             boolean primary,
             GridCacheContext<K, V> cctx,
             CacheEntryEventFilter filter,
-            IgniteInternalFuture<Boolean> notifyFut,
             CacheContinuousQueryEvent<K, V> evt,
-            IgniteCache cache) {
+            boolean fireEvent, IgniteCache cache) {
             this.taskName = taskName;
             this.recordIgniteEvt = recordIgniteEvt;
             this.routineId = routineId;
@@ -1374,48 +1340,73 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
             this.primary = primary;
             this.cctx = cctx;
             this.filter = filter;
-            this.notifyFut = notifyFut;
             this.evt = evt;
             this.cache = cache;
+            this.fireEvent = fireEvent;
+
+            log = ctx.log(CacheContinuousQueryHandler.class);
         }
 
         /** {@inheritDoc} */
         @Override public void run() {
-            boolean notify;
+            filter();
+
+            if (fireEvent || waitIfAsync())
+                onEntryUpdate();
+        }
 
-            if (notifyFut == null) {
-                notify = !evt.entry().isFiltered();
+        /**
+         * @return {@code True} if event fired on this node.
+         */
+        private boolean primary() {
+            return primary || skipPrimaryCheck;
+        }
 
-                if (notify && filter != null) {
-                    try {
-                        notify = filter.evaluate(evt);
-                    }
-                    catch (Exception e) {
-                        U.error(cctx.logger(CacheContinuousQueryHandler.class), "CacheEntryEventFilter failed.", e);
-                    }
-                }
+        /**
+         * @return {@code False} if filter sync.
+         */
+        private boolean waitIfAsync() {
+            if (backup)
+                return false;
+
+            try {
+                U.await(latch);
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                log.error("Failed to wait latch.");
             }
-            else {
-                try {
-                    notify = notifyFut.get();
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(cctx.logger(CacheContinuousQueryHandler.class), "CacheEntryEventFilter failed.", e);
 
-                    notify = true;
-                }
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void skipEvent() {
+            if (evt != null && evt.entry() != null)
+                evt.entry().markFiltered();
+
+            onEntryUpdate();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onEntryUpdate() {
+            if (backup)
+                return;
+
+            if (!fireEvent && asyncCallback) {
+                latch.countDown();
+
+                return;
             }
 
             try {
                 final CacheContinuousQueryEntry entry = evt.entry();
 
-                if (!notify)
-                    entry.markFiltered();
+                if (loc) {
+                    if (!locCache) {
+                        T2<Collection<CacheContinuousQueryEntry>, PartitionRecovery> events = handleEvent(ctx, entry);
 
-                if (primary || skipPrimaryCheck) {
-                    if (loc) {
-                        if (!locCache) {
-                            Collection<CacheContinuousQueryEntry> entries = handleEvent(ctx, entry);
+                        try {
+                            Collection<CacheContinuousQueryEntry> entries = events.get1();
 
                             if (!entries.isEmpty()) {
                                 Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries,
@@ -1438,35 +1429,27 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                                     sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx);
                             }
                         }
-                        else {
-                            if (!entry.isFiltered())
-                                locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt));
+                        finally {
+                            if (events.get2() != null)
+                                events.get2().unlock();
                         }
                     }
                     else {
                         if (!entry.isFiltered())
-                            prepareEntry(cctx, nodeId, entry);
-
-                        CacheContinuousQueryEntry e = handleEntry(entry);
-
-                        if (e != null)
-                            ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true);
+                            locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt));
                     }
                 }
                 else {
-                    if (!internal) {
-                        // Skip init query and expire entries.
-                        if (entry.updateCounter() != -1L) {
-                            entry.markBackup();
+                    if (!entry.isFiltered())
+                        prepareEntry(cctx, nodeId, entry);
 
-                            backupQueue.add(entry);
-                        }
-                    }
+                    CacheContinuousQueryEntry e = handleEntry(entry);
+
+                    if (e != null)
+                        ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true);
                 }
             }
             catch (ClusterTopologyCheckedException ex) {
-                IgniteLogger log = ctx.log(getClass());
-
                 if (log.isDebugEnabled())
                     log.debug("Failed to send event notification to node, node left cluster " +
                         "[node=" + nodeId + ", err=" + ex + ']');
@@ -1497,6 +1480,39 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 ));
             }
         }
+
+        /**
+         *
+         */
+        public void filter() {
+            CacheContinuousQueryEntry entry = evt.entry();
+
+            notify = !entry.isFiltered();
+
+            try {
+                if (notify && filter != null)
+                    notify = filter.evaluate(evt);
+            }
+            catch (Exception e) {
+                U.error(log, "CacheEntryEventFilter failed: " + e);
+            }
+
+            if (!notify)
+                entry.markFiltered();
+
+            if (!primary()) {
+                if (!internal) {
+                    // Skip init query and expire entries.
+                    if (entry.updateCounter() != -1L) {
+                        entry.markBackup();
+
+                        backupQueue.add(entry);
+                    }
+                }
+
+                backup = true;
+            }
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/94e301d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index 405dbff..bf1d4a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -37,17 +37,10 @@ public interface CacheContinuousQueryListener<K, V> {
      * @param evt Event
      * @param primary Primary flag.
      * @param recordIgniteEvt Whether to record event.
+     * @param fireEvent Immediately fired events.
      */
-    public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary, boolean recordIgniteEvt);
-
-    /**
-     * Filters event.
-     *
-     * @param evt Event.
-     * @return {@code True} if the evaluation passes, otherwise false.
-     *   The effect of returning true is that listener will be invoked.
-     */
-    public IgniteInternalFuture<Boolean> filter(CacheContinuousQueryEvent<K, V> evt);
+    public CacheContinuousQueryClosure onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary,
+        boolean recordIgniteEvt, boolean fireEvent);
 
     /**
      * Listener unregistered callback.
@@ -79,7 +72,8 @@ public interface CacheContinuousQueryListener<K, V> {
      * @param topVer Topology version.
      * @param primary Primary
      */
-    public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer, boolean primary);
+    public CacheContinuousQueryClosure skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt,
+        AffinityTopologyVersion topVer, boolean primary, boolean fireEvnt);
 
     /**
      * @param part Partition.

http://git-wip-us.apache.org/repos/asf/ignite/blob/94e301d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 2b3052f..12819c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -24,8 +24,8 @@ import java.io.ObjectOutput;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.UUID;
@@ -47,11 +47,10 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cache.query.CacheAsyncCallback;
+import org.apache.ignite.lang.IgniteAsyncCallback;
 import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -169,32 +168,21 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
      * @param key Entry key.
      * @param partId Partition id.
      * @param updCntr Updated counter.
-     * @param topVer Topology version.
-     */
-    public void skipUpdateEvent(Map<UUID, CacheContinuousQueryListener> lsnrs,
-        KeyCacheObject key,
-        int partId,
-        long updCntr,
-        AffinityTopologyVersion topVer) {
-        skipUpdateEvent(lsnrs, key, partId, updCntr, true, topVer);
-    }
-
-    /**
-     * @param lsnrs Listeners to notify.
-     * @param key Entry key.
-     * @param partId Partition id.
-     * @param updCntr Updated counter.
-     * @param topVer Topology version.
      * @param primary Primary.
+     * @param fireEvnt
+     * @param topVer Topology version.
      */
-    public void skipUpdateEvent(Map<UUID, CacheContinuousQueryListener> lsnrs,
+    public List<CacheContinuousQueryClosure> skipUpdateEvent(Map<UUID, CacheContinuousQueryListener> lsnrs,
         KeyCacheObject key,
         int partId,
         long updCntr,
         boolean primary,
+        boolean fireEvnt,
         AffinityTopologyVersion topVer) {
         assert lsnrs != null;
 
+        List<CacheContinuousQueryClosure> clsrs = new ArrayList<>(lsnrs.size());
+
         for (CacheContinuousQueryListener lsnr : lsnrs.values()) {
             CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry(
                 cctx.cacheId(),
@@ -210,8 +198,10 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
             CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
                 cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
 
-            lsnr.skipUpdateEvent(evt, topVer, primary);
+            clsrs.add(lsnr.skipUpdateEvent(evt, topVer, primary, fireEvnt));
         }
+
+        return clsrs;
     }
 
     /**
@@ -254,6 +244,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         boolean internal,
         int partId,
         boolean primary,
+        boolean fireEvnt,
         boolean preload,
         long updateCntr,
         AffinityTopologyVersion topVer) throws IgniteCheckedException {
@@ -268,113 +259,11 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                 internal,
                 partId,
                 primary,
+                fireEvnt,
                 preload,
                 updateCntr,
-                topVer,
-                null);
-        }
-    }
-
-    /**
-     * @param lsnrCol Listeners to notify.
-     * @param key Key.
-     * @param newVal New value.
-     * @param oldVal Old value.
-     * @param partId Partition.
-     * @param preload Whether update happened during preloading.
-     * @param updateCntr Update counter.
-     * @param topVer Topology version.
-     * @throws IgniteCheckedException In case of error.
-     */
-    public Map<CacheContinuousQueryListener, IgniteInternalFuture<Boolean>> filterEntry(
-        Map<UUID, CacheContinuousQueryListener> lsnrCol,
-        KeyCacheObject key,
-        CacheObject newVal,
-        CacheObject oldVal,
-        int partId,
-        boolean preload,
-        long updateCntr,
-        AffinityTopologyVersion topVer)
-        throws IgniteCheckedException
-    {
-        assert key != null;
-        assert lsnrCol != null;
-
-        boolean hasNewVal = newVal != null;
-        boolean hasOldVal = oldVal != null;
-
-        if (!hasNewVal && !hasOldVal)
-            return null;
-
-        EventType evtType = !hasNewVal ? REMOVED : !hasOldVal ? CREATED : UPDATED;
-
-        boolean initialized = false;
-
-        Map<CacheContinuousQueryListener, IgniteInternalFuture<Boolean>> res = new HashMap<>();
-
-        for (CacheContinuousQueryListener lsnr : lsnrCol.values()) {
-            if (preload && !lsnr.notifyExisting())
-                continue;
-
-            if (!initialized) {
-                if (lsnr.oldValueRequired()) {
-                    oldVal = (CacheObject)cctx.unwrapTemporary(oldVal);
-
-                    if (oldVal != null)
-                        oldVal.finishUnmarshal(cctx.cacheObjectContext(), cctx.deploy().globalLoader());
-                }
-
-                if (newVal != null)
-                    newVal.finishUnmarshal(cctx.cacheObjectContext(), cctx.deploy().globalLoader());
-
-                initialized = true;
-            }
-
-            CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry(
-                cctx.cacheId(),
-                evtType,
-                key,
-                newVal,
-                lsnr.oldValueRequired() ? oldVal : null,
-                lsnr.keepBinary(),
-                partId,
-                updateCntr,
                 topVer);
-
-            CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
-                cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
-
-            res.put(lsnr, lsnr.filter(evt));
         }
-
-        return res;
-    }
-
-    /**
-     * @param lsnrCol Listeners to notify.
-     * @param key Key.
-     * @param newVal New value.
-     * @param oldVal Old value.
-     * @param internal Internal entry (internal key or not user cache),
-     * @param partId Partition.
-     * @param primary {@code True} if called on primary node.
-     * @param preload Whether update happened during preloading.
-     * @param updateCntr Update counter.
-     * @param topVer Topology version.
-     * @throws IgniteCheckedException In case of error.
-     */
-    public void onEntryUpdated(
-        Map<UUID, CacheContinuousQueryListener> lsnrCol,
-        KeyCacheObject key,
-        CacheObject newVal,
-        CacheObject oldVal,
-        boolean internal,
-        int partId,
-        boolean primary,
-        boolean preload,
-        long updateCntr,
-        AffinityTopologyVersion topVer) throws IgniteCheckedException {
-        onEntryUpdated(lsnrCol, key, newVal, oldVal, internal, partId, primary, preload, updateCntr, topVer, null);
     }
 
     /**
@@ -385,13 +274,13 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
      * @param internal Internal entry (internal key or not user cache),
      * @param partId Partition.
      * @param primary {@code True} if called on primary node.
+     * @param fireEvnt Fired event immediately.
      * @param preload Whether update happened during preloading.
      * @param updateCntr Update counter.
      * @param topVer Topology version.
-     * @param filterRes Filter results.
      * @throws IgniteCheckedException In case of error.
      */
-    public void onEntryUpdated(
+    public List<CacheContinuousQueryClosure> onEntryUpdated(
         Map<UUID, CacheContinuousQueryListener> lsnrCol,
         KeyCacheObject key,
         CacheObject newVal,
@@ -399,10 +288,10 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         boolean internal,
         int partId,
         boolean primary,
+        boolean fireEvnt,
         boolean preload,
         long updateCntr,
-        AffinityTopologyVersion topVer,
-        Map<CacheContinuousQueryListener, IgniteInternalFuture<Boolean>> filterRes)
+        AffinityTopologyVersion topVer)
         throws IgniteCheckedException
     {
         assert key != null;
@@ -411,11 +300,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         boolean hasNewVal = newVal != null;
         boolean hasOldVal = oldVal != null;
 
-        if (!hasNewVal && !hasOldVal) {
-            skipUpdateEvent(lsnrCol, key, partId, updateCntr, primary, topVer);
-
-            return;
-        }
+        if (!hasNewVal && !hasOldVal)
+            return skipUpdateEvent(lsnrCol, key, partId, updateCntr, primary, fireEvnt, topVer);
 
         EventType evtType = !hasNewVal ? REMOVED : !hasOldVal ? CREATED : UPDATED;
 
@@ -423,6 +309,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
 
         boolean recordIgniteEvt = primary && !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
 
+        List<CacheContinuousQueryClosure> clsrs = new ArrayList<>(lsnrCol.size());
+
         for (CacheContinuousQueryListener lsnr : lsnrCol.values()) {
             if (preload && !lsnr.notifyExisting())
                 continue;
@@ -455,11 +343,13 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
             CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
                 cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
 
-            if (filterRes != null)
-                evt.setFilterFut(filterRes.get(lsnr));
+            CacheContinuousQueryClosure clsr = lsnr.onEntryUpdated(evt, primary, recordIgniteEvt, fireEvnt);
 
-            lsnr.onEntryUpdated(evt, primary, recordIgniteEvt);
+            if (clsr != null)
+                clsrs.add(clsr);
         }
+
+        return clsrs;
     }
 
     /**
@@ -512,7 +402,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                 CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent(
                     cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
 
-                lsnr.onEntryUpdated(evt, primary, recordIgniteEvt);
+                lsnr.onEntryUpdated(evt, primary, recordIgniteEvt, true);
             }
         }
     }
@@ -1187,7 +1077,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
          * @return {@code True} if filter should be executed in non-system thread.
          */
         protected boolean async() {
-            return impl != null && impl instanceof CacheAsyncCallback;
+            return U.hasAnnotation(impl, IgniteAsyncCallback.class);
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/94e301d8/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java
new file mode 100644
index 0000000..d2069c7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.lang;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryListener;
+import org.apache.ignite.configuration.IgniteConfiguration;
+
+/**
+ * If {@link CacheEntryEventFilter filter} or {@link CacheEntryListener}
+ * annotated this annotation then they will be executing on a separate thread pool. It allows
+ * to use cache API in a callbacks.
+ * <p>
+ * Different implementations can use different thread pools. For example continuous query will use continuous query
+ * thread poll which can be configured by {@link IgniteConfiguration#setContinuousQueryPoolSize(int)}
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface IgniteAsyncCallback {
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/94e301d8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java
index 8eab9d3..7958ac3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java
@@ -32,7 +32,7 @@ import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cache.CacheMemoryMode;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.affinity.Affinity;
-import org.apache.ignite.cache.query.CacheAsyncCallback;
+import org.apache.ignite.lang.IgniteAsyncCallback;
 import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -590,7 +590,8 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr
     /**
      *
      */
-    private static class CacheTestRemoteFilterAsync extends CacheTestRemoteFilter implements CacheAsyncCallback {
+    @IgniteAsyncCallback
+    private static class CacheTestRemoteFilterAsync extends CacheTestRemoteFilter {
         /**
          * @param clsr Closure.
          */
@@ -632,7 +633,8 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr
     /**
      *
      */
-    private static class CacheInvokeListenerAsync extends CacheInvokeListener implements CacheAsyncCallback {
+    @IgniteAsyncCallback
+    private static class CacheInvokeListenerAsync extends CacheInvokeListener {
         /**
          * @param clsr Closure.
          */

http://git-wip-us.apache.org/repos/asf/ignite/blob/94e301d8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java
index 6efa3c4..5d7afdc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java
@@ -26,7 +26,7 @@ import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryEventFilter;
 import javax.cache.event.CacheEntryListenerException;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
-import org.apache.ignite.cache.query.CacheAsyncCallback;
+import org.apache.ignite.lang.IgniteAsyncCallback;
 import org.jetbrains.annotations.NotNull;
 
 /**
@@ -43,8 +43,9 @@ public class CacheContinuousQueryFactoryAsyncFilterRandomOperationTest
     /**
      *
      */
+    @IgniteAsyncCallback
     protected static class NonSerializableAsyncFilter implements
-        CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue>, CacheAsyncCallback, Externalizable {
+        CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue>, Externalizable {
         /** */
         public NonSerializableAsyncFilter() {
             // No-op.


Mime
View raw message