ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [04/38] ignite git commit: IGNITE-2004 Fixed "Asynchronous execution of ContinuousQuery's remote filter & local list".
Date Wed, 18 May 2016 10:58:10 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 767697a..9ae2972 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -48,6 +48,7 @@ import org.apache.ignite.events.CacheQueryExecutedEvent;
 import org.apache.ignite.events.CacheQueryReadEvent;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteDeploymentCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
@@ -58,7 +59,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryLocalListener;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryRemoteFilter;
 import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
 import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
@@ -66,13 +70,14 @@ import org.apache.ignite.internal.processors.platform.cache.query.PlatformContin
 import org.apache.ignite.internal.util.GridConcurrentSkipListSet;
 import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.C1;
+import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteAsyncCallback;
 import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentLinkedDeque8;
@@ -159,6 +164,21 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     /** */
     private transient boolean ignoreClsNotFound;
 
+    /** */
+    private transient boolean asyncCallback;
+
+    /** */
+    private transient UUID nodeId;
+
+    /** */
+    private transient UUID routineId;
+
+    /** */
+    private transient GridKernalContext ctx;
+
+    /** */
+    private transient IgniteLogger log;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -283,13 +303,36 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         assert routineId != null;
         assert ctx != null;
 
-        if (locLsnr != null)
-            ctx.resource().injectGeneric(locLsnr);
+        if (locLsnr != null) {
+            if (locLsnr instanceof JCacheQueryLocalListener) {
+                ctx.resource().injectGeneric(((JCacheQueryLocalListener)locLsnr).impl);
+
+                asyncCallback = ((JCacheQueryLocalListener)locLsnr).async();
+            }
+            else {
+                ctx.resource().injectGeneric(locLsnr);
+
+                asyncCallback = U.hasAnnotation(locLsnr, IgniteAsyncCallback.class);
+            }
+        }
 
         final CacheEntryEventFilter filter = getEventFilter();
 
-        if (filter != null)
-            ctx.resource().injectGeneric(filter);
+        if (filter != null) {
+            if (filter instanceof JCacheQueryRemoteFilter) {
+                if (((JCacheQueryRemoteFilter)filter).impl != null)
+                    ctx.resource().injectGeneric(((JCacheQueryRemoteFilter)filter).impl);
+
+                if (!asyncCallback)
+                    asyncCallback = ((JCacheQueryRemoteFilter)filter).async();
+            }
+            else {
+                ctx.resource().injectGeneric(filter);
+
+                if (!asyncCallback)
+                    asyncCallback = U.hasAnnotation(filter, IgniteAsyncCallback.class);
+            }
+        }
 
         entryBufs = new ConcurrentHashMap<>();
 
@@ -299,10 +342,18 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
         rcvs = new ConcurrentHashMap<>();
 
+        this.nodeId = nodeId;
+
+        this.routineId = routineId;
+
+        this.ctx = ctx;
+
         final boolean loc = nodeId.equals(ctx.localNodeId());
 
         assert !skipPrimaryCheck || loc;
 
+        log = ctx.log(CacheContinuousQueryHandler.class);
+
         CacheContinuousQueryListener<K, V> lsnr = new CacheContinuousQueryListener<K, V>() {
             @Override public void onExecution() {
                 if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
@@ -324,15 +375,16 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 }
             }
 
-            /** {@inheritDoc} */
             @Override public boolean keepBinary() {
                 return keepBinary;
             }
 
-            @Override public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary,
-                boolean recordIgniteEvt) {
+            @Override public void onEntryUpdated(final CacheContinuousQueryEvent<K, V> evt,
+                boolean primary,
+                final boolean recordIgniteEvt,
+                GridDhtAtomicUpdateFuture fut) {
                 if (ignoreExpired && evt.getEventType() == EventType.EXPIRED)
-                    return;
+                    return ;
 
                 final GridCacheContext<K, V> cctx = cacheContext(ctx);
 
@@ -343,110 +395,33 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 // skipPrimaryCheck is set only when listen locally for replicated cache events.
                 assert !skipPrimaryCheck || (cctx.isReplicated() && ctx.localNodeId().equals(nodeId));
 
-                boolean notify = !evt.entry().isFiltered();
+                if (asyncCallback) {
+                    ContinuousQueryAsyncClosure clsr = new ContinuousQueryAsyncClosure(
+                        primary,
+                        evt,
+                        recordIgniteEvt,
+                        fut);
 
-                if (notify && filter != null) {
-                    try {
-                        notify = filter.evaluate(evt);
-                    }
-                    catch (Exception e) {
-                        U.error(cctx.logger(CacheContinuousQueryHandler.class), "CacheEntryEventFilter failed: " + e);
-                    }
+                    ctx.asyncCallbackPool().execute(clsr, evt.partitionId());
                 }
-
-                try {
-                    final CacheContinuousQueryEntry entry = evt.entry();
-
-                    if (!notify)
-                        entry.markFiltered();
+                else {
+                    final boolean notify = filter(evt, primary);
 
                     if (primary || skipPrimaryCheck) {
-                        if (loc) {
-                            if (!locCache) {
-                                Collection<CacheContinuousQueryEntry> entries = handleEvent(ctx, entry);
-
-                                if (!entries.isEmpty()) {
-                                    final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name());
-
-                                    Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries,
-                                        new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() {
-                                            @Override public CacheEntryEvent<? extends K, ? extends V> apply(
-                                                CacheContinuousQueryEntry e) {
-                                                return new CacheContinuousQueryEvent<>(cache, cctx, e);
-                                            }
-                                        },
-                                        new IgnitePredicate<CacheContinuousQueryEntry>() {
-                                            @Override public boolean apply(CacheContinuousQueryEntry entry) {
-                                                return !entry.isFiltered();
-                                            }
-                                        }
-                                    );
-
-                                    if (!F.isEmpty(evts))
-                                        locLsnr.onUpdated(evts);
-
-                                    if (!internal && !skipPrimaryCheck)
-                                        sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx);
-                                }
-                            }
-                            else {
-                                if (!entry.isFiltered())
-                                    locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt));
-                            }
-                        }
+                        if (fut == null)
+                            onEntryUpdate(evt, notify, loc, recordIgniteEvt);
                         else {
-                            if (!entry.isFiltered())
-                                prepareEntry(cctx, nodeId, entry);
+                            fut.addContinuousQueryClosure(new CI1<Boolean>() {
+                                @Override public void apply(Boolean suc) {
+                                    if (!suc)
+                                        evt.entry().markFiltered();
 
-                            CacheContinuousQueryEntry e = handleEntry(entry);
-
-                            if (e != null)
-                                ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true);
-                        }
-                    }
-                    else {
-                        if (!internal) {
-                            // Skip init query and expire entries.
-                            if (entry.updateCounter() != -1L) {
-                                entry.markBackup();
-
-                                backupQueue.add(entry);
-                            }
+                                    onEntryUpdate(evt, notify, loc, recordIgniteEvt);
+                                }
+                            });
                         }
                     }
                 }
-                catch (ClusterTopologyCheckedException ex) {
-                    IgniteLogger log = ctx.log(getClass());
-
-                    if (log.isDebugEnabled())
-                        log.debug("Failed to send event notification to node, node left cluster " +
-                            "[node=" + nodeId + ", err=" + ex + ']');
-                }
-                catch (IgniteCheckedException ex) {
-                    U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex);
-                }
-
-                if (recordIgniteEvt && notify) {
-                    ctx.event().record(new CacheQueryReadEvent<>(
-                        ctx.discovery().localNode(),
-                        "Continuous query executed.",
-                        EVT_CACHE_QUERY_OBJECT_READ,
-                        CacheQueryType.CONTINUOUS.name(),
-                        cacheName,
-                        null,
-                        null,
-                        null,
-                        filter instanceof CacheEntryEventSerializableFilter ?
-                            (CacheEntryEventSerializableFilter)filter : null,
-                        null,
-                        nodeId,
-                        taskName(),
-                        evt.getKey(),
-                        evt.getValue(),
-                        evt.getOldValue(),
-                        null
-                    ));
-                }
             }
 
             @Override public void onUnregister() {
@@ -492,15 +467,15 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 sendBackupAcknowledge(ackBuf.acknowledgeOnTimeout(), routineId, ctx);
             }
 
-            @Override public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer,
-                boolean primary) {
+            @Override public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt,
+                AffinityTopologyVersion topVer, boolean primary) {
                 assert evt != null;
 
                 CacheContinuousQueryEntry e = evt.entry();
 
                 e.markFiltered();
 
-                onEntryUpdated(evt, primary, false);
+                onEntryUpdated(evt, primary, false, null);
             }
 
             @Override public void onPartitionEvicted(int part) {
@@ -597,17 +572,73 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Override public void notifyCallback(UUID nodeId, UUID routineId, Collection<?> objs, GridKernalContext ctx) {
+    @Override public void notifyCallback(final UUID nodeId,
+        final UUID routineId,
+        Collection<?> objs,
+        final GridKernalContext ctx) {
         assert nodeId != null;
         assert routineId != null;
         assert objs != null;
         assert ctx != null;
 
-        Collection<CacheContinuousQueryEntry> entries = (Collection<CacheContinuousQueryEntry>)objs;
+        final List<CacheContinuousQueryEntry> entries = (List<CacheContinuousQueryEntry>)objs;
+
+        if (entries.isEmpty())
+            return;
+
+        if (asyncCallback) {
+            IgniteStripedThreadPoolExecutor asyncPool = ctx.asyncCallbackPool();
+
+            int threadId = asyncPool.threadId(entries.get(0).partition());
+
+            int startIdx = 0;
+
+            if (entries.size() != 1) {
+                for (int i = 1; i < entries.size(); i++) {
+                    int curThreadId = asyncPool.threadId(entries.get(i).partition());
 
+                    // If all entries from one partition avoid creation new collections.
+                    if (curThreadId == threadId)
+                        continue;
+
+                    final int i0 = i;
+                    final int startIdx0 = startIdx;
+
+                    asyncPool.execute(new Runnable() {
+                        @Override public void run() {
+                            notifyCallback0(nodeId, ctx, entries.subList(startIdx0, i0));
+                        }
+                    }, threadId);
+
+                    startIdx = i0;
+                    threadId = curThreadId;
+                }
+            }
+
+            final int startIdx0 = startIdx;
+
+            asyncPool.execute(new Runnable() {
+                @Override public void run() {
+                    notifyCallback0(nodeId, ctx,
+                        startIdx0 == 0 ? entries : entries.subList(startIdx0, entries.size()));
+                }
+            }, threadId);
+        }
+        else
+            notifyCallback0(nodeId, ctx, entries);
+    }
+
+    /**
+     * @param nodeId Node id.
+     * @param ctx Kernal context.
+     * @param entries Entries.
+     */
+    private void notifyCallback0(UUID nodeId,
+        final GridKernalContext ctx,
+        Collection<CacheContinuousQueryEntry> entries) {
         final GridCacheContext cctx = cacheContext(ctx);
 
-        Collection<CacheContinuousQueryEntry> entries0 = new ArrayList<>();
+        final Collection<CacheEntryEvent<? extends K, ? extends V>> entries0 = new ArrayList<>(entries.size());
 
         for (CacheContinuousQueryEntry e : entries) {
             GridCacheDeploymentManager depMgr = cctx.deploy();
@@ -626,7 +657,10 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
             try {
                 e.unmarshal(cctx, ldr);
 
-                entries0.addAll(handleEvent(ctx, e));
+                Collection<CacheEntryEvent<? extends K, ? extends V>> evts = handleEvent(ctx, e);
+
+                if (evts != null && !evts.isEmpty())
+                    entries0.addAll(evts);
             }
             catch (IgniteCheckedException ex) {
                 if (ignoreClsNotFound)
@@ -636,24 +670,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
             }
         }
 
-        final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name());
-
-        if (!entries0.isEmpty()) {
-            Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0,
-                new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() {
-                    @Override public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry e) {
-                        return new CacheContinuousQueryEvent<>(cache, cctx, e);
-                    }
-                },
-                new IgnitePredicate<CacheContinuousQueryEntry>() {
-                    @Override public boolean apply(CacheContinuousQueryEntry entry) {
-                        return !entry.isFiltered();
-                    }
-                }
-            );
-
-            locLsnr.onUpdated(evts);
-        }
+        if (!entries0.isEmpty())
+            locLsnr.onUpdated(entries0);
     }
 
     /**
@@ -661,24 +679,142 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
      * @param e entry.
      * @return Entry collection.
      */
-    private Collection<CacheContinuousQueryEntry> handleEvent(GridKernalContext ctx,
+    private Collection<CacheEntryEvent<? extends K, ? extends V>> handleEvent(GridKernalContext ctx,
         CacheContinuousQueryEntry e) {
         assert e != null;
 
+        GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+        final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name());
+
         if (internal) {
             if (e.isFiltered())
                 return Collections.emptyList();
             else
-                return F.asList(e);
+                return F.<CacheEntryEvent<? extends K, ? extends V>>
+                    asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, e));
         }
 
         // Initial query entry or evicted entry. These events should be fired immediately.
-        if (e.updateCounter() == -1L)
-            return F.asList(e);
+        if (e.updateCounter() == -1L) {
+            return !e.isFiltered() ? F.<CacheEntryEvent<? extends K, ? extends V>>asList(
+                new CacheContinuousQueryEvent<K, V>(cache, cctx, e)) :
+                Collections.<CacheEntryEvent<? extends K, ? extends V>>emptyList();
+        }
 
         PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition());
 
-        return rec.collectEntries(e);
+        return rec.collectEntries(e, cctx, cache);
+    }
+
+    /**
+     * @param primary Primary.
+     * @param evt Query event.
+     * @return {@code True} if event passed filter otherwise {@code true}.
+     */
+    public boolean filter(CacheContinuousQueryEvent evt, boolean primary) {
+        CacheContinuousQueryEntry entry = evt.entry();
+
+        boolean notify = !entry.isFiltered();
+
+        try {
+            if (notify && getEventFilter() != null)
+                notify = getEventFilter().evaluate(evt);
+        }
+        catch (Exception e) {
+            U.error(log, "CacheEntryEventFilter failed: " + e);
+        }
+
+        if (!notify)
+            entry.markFiltered();
+
+        if (!primary && !internal && entry.updateCounter() != -1L /* Skip init query and expire entries */) {
+            entry.markBackup();
+
+            backupQueue.add(entry);
+        }
+
+        return notify;
+    }
+
+    /**
+     * @param evt Continuous query event.
+     * @param notify Notify flag.
+     * @param loc Listener deployed on this node.
+     * @param recordIgniteEvt Record ignite event.
+     */
+    private void onEntryUpdate(CacheContinuousQueryEvent evt, boolean notify, boolean loc, boolean recordIgniteEvt) {
+        try {
+            GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+            if (cctx == null)
+                return;
+
+            final CacheContinuousQueryEntry entry = evt.entry();
+
+            if (loc) {
+                if (!locCache) {
+                    Collection<CacheEntryEvent<? extends K, ? extends V>> evts = handleEvent(ctx, entry);
+
+                    if (!evts.isEmpty()) {
+                        locLsnr.onUpdated(evts);
+
+                        if (!internal && !skipPrimaryCheck)
+                            sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx);
+                    }
+                }
+                else {
+                    if (!entry.isFiltered())
+                        locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt));
+                }
+            }
+            else {
+                if (!entry.isFiltered())
+                    prepareEntry(cctx, nodeId, entry);
+
+                CacheContinuousQueryEntry e = handleEntry(entry);
+
+                if (e != null)
+                    ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true);
+            }
+        }
+        catch (ClusterTopologyCheckedException ex) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send event notification to node, node left cluster " +
+                    "[node=" + nodeId + ", err=" + ex + ']');
+        }
+        catch (IgniteCheckedException ex) {
+            U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex);
+        }
+
+        if (recordIgniteEvt && notify) {
+            ctx.event().record(new CacheQueryReadEvent<>(
+                ctx.discovery().localNode(),
+                "Continuous query executed.",
+                EVT_CACHE_QUERY_OBJECT_READ,
+                CacheQueryType.CONTINUOUS.name(),
+                cacheName,
+                null,
+                null,
+                null,
+                getEventFilter() instanceof CacheEntryEventSerializableFilter ?
+                    (CacheEntryEventSerializableFilter)getEventFilter() : null,
+                null,
+                nodeId,
+                taskName(),
+                evt.getKey(),
+                evt.getValue(),
+                evt.getOldValue(),
+                null
+            ));
+        }
+    }
+
+    /**
+     * @return Task name.
+     */
+    private String taskName() {
+        return ctx.security().enabled() ? ctx.task().resolveTaskName(taskHash) : null;
     }
 
     /**
@@ -710,9 +846,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                         }
                     }
                 }
-                else if (initUpdCntrs != null) {
+                else if (initUpdCntrs != null)
                     partCntr = initUpdCntrs.get(partId);
-                }
             }
 
             rec = new PartitionRecovery(ctx.log(getClass()), initTopVer0, partCntr);
@@ -802,19 +937,26 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         /**
          * Add continuous entry.
          *
+         * @param cctx Cache context.
+         * @param cache Cache.
          * @param entry Cache continuous query entry.
-         * @return Collection entries which will be fired.
+         * @return Collection entries which will be fired. This collection should contains only non-filtered events.
          */
-        public Collection<CacheContinuousQueryEntry> collectEntries(CacheContinuousQueryEntry entry) {
+        <K, V> Collection<CacheEntryEvent<? extends K, ? extends V>> collectEntries(
+            CacheContinuousQueryEntry entry,
+            GridCacheContext cctx,
+            IgniteCache cache
+        ) {
             assert entry != null;
 
             if (entry.topologyVersion() == null) { // Possible if entry is sent from old node.
                 assert entry.updateCounter() == 0L : entry;
 
-                return F.asList(entry);
+                return F.<CacheEntryEvent<? extends K, ? extends V>>
+                    asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry));
             }
 
-            List<CacheContinuousQueryEntry> entries;
+            List<CacheEntryEvent<? extends K, ? extends V>> entries;
 
             synchronized (pendingEvts) {
                 // Received first event.
@@ -823,7 +965,10 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
                     curTop = entry.topologyVersion();
 
-                    return F.asList(entry);
+                    return !entry.isFiltered() ?
+                        F.<CacheEntryEvent<? extends K, ? extends V>>
+                            asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry)) :
+                        Collections.<CacheEntryEvent<? extends K, ? extends V>>emptyList();
                 }
 
                 if (curTop.compareTo(entry.topologyVersion()) < 0) {
@@ -832,7 +977,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
                         for (CacheContinuousQueryEntry evt : pendingEvts.values()) {
                             if (evt != HOLE && !evt.isFiltered())
-                                entries.add(evt);
+                                entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, evt));
                         }
 
                         pendingEvts.clear();
@@ -841,7 +986,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
                         lastFiredEvt = entry.updateCounter();
 
-                        entries.add(entry);
+                        if (!entry.isFiltered())
+                            entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry));
 
                         return entries;
                     }
@@ -880,7 +1026,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                         Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next();
 
                         if (e.getValue() != HOLE && !e.getValue().isFiltered())
-                            entries.add(e.getValue());
+                            entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, e.getValue()));
 
                         lastFiredEvt = e.getKey();
 
@@ -896,7 +1042,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                             ++lastFiredEvt;
 
                             if (e.getValue() != HOLE && !e.getValue().isFiltered())
-                                entries.add(e.getValue());
+                                entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, e.getValue()));
 
                             iter.remove();
                         }
@@ -1258,6 +1404,87 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     }
 
     /**
+     *
+     */
+    private class ContinuousQueryAsyncClosure implements Runnable {
+        /** */
+        private final CacheContinuousQueryEvent<K, V> evt;
+
+        /** */
+        private final boolean primary;
+
+        /** */
+        private final boolean recordIgniteEvt;
+
+        /** */
+        private final IgniteInternalFuture<?> fut;
+
+        /**
+         * @param primary Primary flag.
+         * @param evt Event.
+         * @param recordIgniteEvt Fired event.
+         * @param fut Dht future.
+         */
+        ContinuousQueryAsyncClosure(
+            boolean primary,
+            CacheContinuousQueryEvent<K, V> evt,
+            boolean recordIgniteEvt,
+            IgniteInternalFuture<?> fut) {
+            this.primary = primary;
+            this.evt = evt;
+            this.recordIgniteEvt = recordIgniteEvt;
+            this.fut = fut;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            final boolean notify = filter(evt, primary);
+
+            if (!primary())
+                return;
+
+            if (fut == null) {
+                onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt);
+
+                return;
+            }
+
+            if (fut.isDone()) {
+                if (fut.error() != null)
+                    evt.entry().markFiltered();
+
+                onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt);
+            }
+            else {
+                fut.listen(new CI1<IgniteInternalFuture<?>>() {
+                    @Override public void apply(IgniteInternalFuture<?> f) {
+                        if (f.error() != null)
+                            evt.entry().markFiltered();
+
+                        ctx.asyncCallbackPool().execute(new Runnable() {
+                            @Override public void run() {
+                                onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt);
+                            }
+                        }, evt.entry().partition());
+                    }
+                });
+            }
+        }
+
+        /**
+         * @return {@code True} if event fired on this node.
+         */
+        private boolean primary() {
+            return primary || skipPrimaryCheck;
+        }
+
+        /** {@inheritDoc} */
+        public String toString() {
+            return S.toString(ContinuousQueryAsyncClosure.class, this);
+        }
+    }
+
+    /**
      * Deployable object.
      */
     protected static class DeployableObject implements Externalizable {

http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index 83ff32c..8eca81c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal.processors.cache.query.continuous;
 import java.util.Map;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Continuous query listener.
@@ -36,8 +38,10 @@ public interface CacheContinuousQueryListener<K, V> {
      * @param evt Event
      * @param primary Primary flag.
      * @param recordIgniteEvt Whether to record event.
+     * @param fut Dht atomic future.
      */
-    public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary, boolean recordIgniteEvt);
+    public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary,
+        boolean recordIgniteEvt, @Nullable GridDhtAtomicUpdateFuture fut);
 
     /**
      * Listener unregistered callback.

http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index f9c33c1..fafb830 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -54,11 +54,13 @@ import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
 import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteAsyncCallback;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.security.SecurityPermission;
@@ -166,23 +168,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
      * @param key Entry key.
      * @param partId Partition id.
      * @param updCntr Updated counter.
-     * @param topVer Topology version.
-     */
-    public void skipUpdateEvent(Map<UUID, CacheContinuousQueryListener> lsnrs,
-        KeyCacheObject key,
-        int partId,
-        long updCntr,
-        AffinityTopologyVersion topVer) {
-        skipUpdateEvent(lsnrs, key, partId, updCntr, true, topVer);
-    }
-
-    /**
-     * @param lsnrs Listeners to notify.
-     * @param key Entry key.
-     * @param partId Partition id.
-     * @param updCntr Updated counter.
-     * @param topVer Topology version.
      * @param primary Primary.
+     * @param topVer Topology version.
      */
     public void skipUpdateEvent(Map<UUID, CacheContinuousQueryListener> lsnrs,
         KeyCacheObject key,
@@ -241,6 +228,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
      * @param primary {@code True} if called on primary node.
      * @param preload Whether update happened during preloading.
      * @param updateCntr Update counter.
+     * @param fut Dht atomic future.
      * @param topVer Topology version.
      * @throws IgniteCheckedException In case of error.
      */
@@ -253,7 +241,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         boolean primary,
         boolean preload,
         long updateCntr,
-        AffinityTopologyVersion topVer) throws IgniteCheckedException {
+        @Nullable GridDhtAtomicUpdateFuture fut,
+        AffinityTopologyVersion topVer
+    ) throws IgniteCheckedException {
         Map<UUID, CacheContinuousQueryListener> lsnrCol = updateListeners(internal, preload);
 
         if (lsnrCol != null) {
@@ -267,6 +257,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                 primary,
                 preload,
                 updateCntr,
+                fut,
                 topVer);
         }
     }
@@ -282,6 +273,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
      * @param preload Whether update happened during preloading.
      * @param updateCntr Update counter.
      * @param topVer Topology version.
+     * @param fut Dht atomic future.
      * @throws IgniteCheckedException In case of error.
      */
     public void onEntryUpdated(
@@ -294,6 +286,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         boolean primary,
         boolean preload,
         long updateCntr,
+        @Nullable GridDhtAtomicUpdateFuture fut,
         AffinityTopologyVersion topVer)
         throws IgniteCheckedException
     {
@@ -347,7 +340,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
             CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
                 cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
 
-            lsnr.onEntryUpdated(evt, primary, recordIgniteEvt);
+            lsnr.onEntryUpdated(evt, primary, recordIgniteEvt, fut);
         }
     }
 
@@ -401,7 +394,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                 CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent(
                     cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
 
-                lsnr.onEntryUpdated(evt, primary, recordIgniteEvt);
+                lsnr.onEntryUpdated(evt, primary, recordIgniteEvt, null);
             }
         }
     }
@@ -511,7 +504,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         return executeQuery0(
             locLsnr,
             new IgniteClosure<Boolean, CacheContinuousQueryHandler>() {
-                @Override public CacheContinuousQueryHandler apply(Boolean aBoolean) {
+                @Override public CacheContinuousQueryHandler apply(Boolean v2) {
                     return new CacheContinuousQueryHandler(
                         cctx.name(),
                         TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
@@ -801,6 +794,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     }
 
     /**
+     *
      */
     private class JCacheQuery {
         /** */
@@ -932,9 +926,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     /**
      *
      */
-    private static class JCacheQueryLocalListener<K, V> implements CacheEntryUpdatedListener<K, V> {
+    static class JCacheQueryLocalListener<K, V> implements CacheEntryUpdatedListener<K, V> {
         /** */
-        private final CacheEntryListener<K, V> impl;
+        final CacheEntryListener<K, V> impl;
 
         /** */
         private final IgniteLogger log;
@@ -958,28 +952,28 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                 try {
                     switch (evt.getEventType()) {
                         case CREATED:
-                            assert impl instanceof CacheEntryCreatedListener;
+                            assert impl instanceof CacheEntryCreatedListener : evt;
 
                             ((CacheEntryCreatedListener<K, V>)impl).onCreated(singleton(evt));
 
                             break;
 
                         case UPDATED:
-                            assert impl instanceof CacheEntryUpdatedListener;
+                            assert impl instanceof CacheEntryUpdatedListener : evt;
 
                             ((CacheEntryUpdatedListener<K, V>)impl).onUpdated(singleton(evt));
 
                             break;
 
                         case REMOVED:
-                            assert impl instanceof CacheEntryRemovedListener;
+                            assert impl instanceof CacheEntryRemovedListener : evt;
 
                             ((CacheEntryRemovedListener<K, V>)impl).onRemoved(singleton(evt));
 
                             break;
 
                         case EXPIRED:
-                            assert impl instanceof CacheEntryExpiredListener;
+                            assert impl instanceof CacheEntryExpiredListener : evt;
 
                             ((CacheEntryExpiredListener<K, V>)impl).onExpired(singleton(evt));
 
@@ -1010,6 +1004,13 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
 
             return evts;
         }
+
+        /**
+         * @return {@code True} if listener should be executed in non-system thread.
+         */
+        protected boolean async() {
+            return U.hasAnnotation(impl, IgniteAsyncCallback.class);
+        }
     }
 
     /**
@@ -1020,7 +1021,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         private static final long serialVersionUID = 0L;
 
         /** */
-        private CacheEntryEventFilter impl;
+        protected CacheEntryEventFilter impl;
 
         /** */
         private byte types;
@@ -1073,6 +1074,13 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         }
 
         /**
+         * @return {@code True} if filter should be executed in non-system thread.
+         */
+        protected boolean async() {
+            return U.hasAnnotation(impl, IgniteAsyncCallback.class);
+        }
+
+        /**
          * @param evtType Type.
          * @return Flag value.
          */

http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java
new file mode 100644
index 0000000..1e04ce6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.lang;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryListener;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.configuration.IgniteConfiguration;
+
+/**
+ * If callback has this annotation then it will be executing in another thread.
+ * <p>
+ * Currently this annotation is supported for:
+ * <ol>
+ *     <li>{@link ContinuousQuery} - {@link CacheEntryUpdatedListener} and {@link CacheEntryEventFilter}.</li>
+ * </ol>
+ * <p>
+ * For example, if {@link CacheEntryEventFilter filter} or {@link CacheEntryListener}
+ * has the annotation then callbacks will be executing to asyncCallback thread pool. It allows to use cache API
+ * in a callbacks. This thread pool can be configured by {@link IgniteConfiguration#setAsyncCallbackPoolSize(int)}.
+ * <h1 class="header">Example</h1>
+ * As an example, suppose we have cache with {@code 'Person'} objects and we need
+ * to query all persons with salary above then 1000. Also remote filter will update some entries.
+ * <p>
+ * Here is the {@code Person} class:
+ * <pre name="code" class="java">
+ * public class Person {
+ *     // Name.
+ *     private String name;
+ *
+ *     // Salary.
+ *     private double salary;
+ *
+ *     ...
+ * }
+ * </pre>
+ * <p>
+ * Here is the {@code ExampleCacheEntryFilter} class:
+ * <pre name="code" class="java">
+ * &#064;IgniteAsyncCallback
+ * public class ExampleCacheEntryFilter implements CacheEntryEventFilter&lt;Integer, Person&gt; {
+ *     &#064;IgniteInstanceResource
+ *     private Ignite ignite;
+ *
+ *     // Continuous listener will be notified for persons with salary above 1000.
+ *     // Filter increases salary for some person on 100. Without &#064;IgniteAsyncCallback annotation
+ *     // this operation is not safe.
+ *     public boolean evaluate(CacheEntryEvent&lt;? extends K, ? extends V&gt; evt) throws CacheEntryListenerException {
+ *         Person p = evt.getValue();
+ *
+ *         if (p.getSalary() &gt; 1000)
+ *             return true;
+ *
+ *         ignite.cache("Person").put(evt.getKey(), new Person(p.getName(), p.getSalary() + 100));
+ *
+ *         return false;
+ *     }
+ * }
+ * </pre>
+ * <p>
+ * Query with asynchronous callback execute as usually:
+ * <pre name="code" class="java">
+ * // Create new continuous query.
+ * ContinuousQuery&lt;Long, Person&gt; qry = new ContinuousQuery&lt;&gt;();
+ *
+ * // Callback that is called locally when update notifications are received.
+ * // It simply prints out information about all created persons.
+ * qry.setLocalListener((evts) -> {
+ *     for (CacheEntryEvent&lt;? extends Long, ? extends Person&gt; e : evts) {
+ *         Person p = e.getValue();
+ *
+ *         System.out.println(p.getFirstName() + " " + p.getLastName() + "'s salary is " + p.getSalary());
+ *     }
+ * });
+ *
+ * // Sets remote filter.
+ * qry.setRemoteFilterFactory(() -> new ExampleCacheEntryFilter());
+ *
+ * // Execute query.
+ * QueryCursor&lt;Cache.Entry&lt;Long, Person&gt;&gt; cur = cache.query(qry);
+ * </pre>
+ *
+ * @see IgniteConfiguration#getAsyncCallbackPoolSize
+ * @see ContinuousQuery#getRemoteFilterFactory()
+ * @see ContinuousQuery#getLocalListener()
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface IgniteAsyncCallback {
+    // No-op.
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
index 35882b9..9f7c381 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
@@ -17,62 +17,63 @@
 
 package org.apache.ignite.thread;
 
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.LinkedList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * An {@link ExecutorService} that executes submitted tasks using pooled grid threads.
  */
 public class IgniteStripedThreadPoolExecutor implements ExecutorService {
     /** */
-    public static final int DFLT_SEG_POOL_SIZE = 8;
-
-    /** */
-    public static final int DFLT_CONCUR_LVL = 16;
-
-    /** */
     private final ExecutorService[] execs;
 
-    /** */
-    private final int segShift;
-
-    /** */
-    private final int segMask;
-
     /**
+     * Create striped thread pool.
      *
+     * @param concurrentLvl Concurrency level.
+     * @param gridName Node name.
+     * @param threadNamePrefix Thread name prefix.
      */
-    public IgniteStripedThreadPoolExecutor() {
-        execs = new ExecutorService[DFLT_CONCUR_LVL];
-
-        ThreadFactory factory = new IgniteThreadFactory(null);
-
-        for (int i = 0; i < DFLT_CONCUR_LVL; i++)
-            execs[i] = Executors.newFixedThreadPool(DFLT_SEG_POOL_SIZE, factory);
+    public IgniteStripedThreadPoolExecutor(int concurrentLvl, String gridName, String threadNamePrefix) {
+        execs = new ExecutorService[concurrentLvl];
 
-        // Find power-of-two sizes best matching arguments
-        int sshift = 0;
-        int ssize = 1;
+        ThreadFactory factory = new IgniteThreadFactory(gridName, threadNamePrefix);
 
-        while (ssize < DFLT_CONCUR_LVL) {
-            ++sshift;
-
-            ssize <<= 1;
-        }
+        for (int i = 0; i < concurrentLvl; i++)
+            execs[i] = Executors.newSingleThreadExecutor(factory);
+    }
 
-        segShift = 32 - sshift;
-        segMask = ssize - 1;
+    /**
+     * Executes the given command at some time in the future. The command with the same {@code index}
+     * will be executed in the same thread.
+     *
+     * @param task the runnable task
+     * @param idx Striped index.
+     * @throws RejectedExecutionException if this task cannot be
+     * accepted for execution.
+     * @throws NullPointerException If command is null
+     */
+    public void execute(Runnable task, int idx) {
+        execs[threadId(idx)].execute(task);
+    }
 
+    /**
+     * @param idx Index.
+     * @return Stripped thread ID.
+     */
+    public int threadId(int idx) {
+        return idx < execs.length ? idx : idx % execs.length;
     }
 
     /** {@inheritDoc} */
@@ -83,7 +84,10 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService {
 
     /** {@inheritDoc} */
     @Override public List<Runnable> shutdownNow() {
-        List<Runnable> res = new LinkedList<>();
+        if (execs.length == 0)
+            return Collections.emptyList();
+
+        List<Runnable> res = new ArrayList<>(execs.length);
 
         for (ExecutorService exec : execs) {
             for (Runnable r : exec.shutdownNow())
@@ -124,105 +128,45 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService {
     }
 
     /** {@inheritDoc} */
-    @Override public <T> Future<T> submit(Callable<T> task) {
-        return execForTask(task).submit(task);
+    @NotNull @Override public <T> Future<T> submit(Callable<T> task) {
+        throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
-    @Override public <T> Future<T> submit(Runnable task, T result) {
-        return execForTask(task).submit(task, result);
+    @NotNull @Override public <T> Future<T> submit(Runnable task, T res) {
+        throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
-    @Override public Future<?> submit(Runnable task) {
-        return execForTask(task).submit(task);
+    @NotNull @Override public Future<?> submit(Runnable task) {
+        throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
-    @Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
-        throws InterruptedException {
-        List<Future<T>> futs = new LinkedList<>();
-
-        for (Callable<T> task : tasks)
-            futs.add(execForTask(task).submit(task));
-
-        boolean done = false;
-
-        try {
-            for (Future<T> fut : futs) {
-                try {
-                    fut.get();
-                }
-                catch (ExecutionException | InterruptedException ignored) {
-                    // No-op.
-                }
-            }
-
-            done = true;
-
-            return futs;
-        }
-        finally {
-            if (!done) {
-                for (Future<T> fut : futs)
-                    fut.cancel(true);
-            }
-        }
+    @NotNull @Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
+        throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
-    @Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout,
-        TimeUnit unit) throws InterruptedException {
-        throw new RuntimeException("Not implemented.");
+    @NotNull @Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
+        long timeout,
+        TimeUnit unit) {
+        throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
-    @Override public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException,
-        ExecutionException {
-        throw new RuntimeException("Not implemented.");
+    @NotNull @Override public <T> T invokeAny(Collection<? extends Callable<T>> tasks) {
+        throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
-    @Override public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
-        throws InterruptedException, ExecutionException, TimeoutException {
-        throw new RuntimeException("Not implemented.");
+    @Override public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) {
+        throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
     @Override public void execute(Runnable cmd) {
-        execForTask(cmd).execute(cmd);
-    }
-
-    /**
-     * Applies a supplemental hash function to a given hashCode, which
-     * defends against poor quality hash functions.  This is critical
-     * because ConcurrentHashMap uses power-of-two length hash tables,
-     * that otherwise encounter collisions for hashCodes that do not
-     * differ in lower or upper bits.
-     *
-     * @param h Hash code.
-     * @return Enhanced hash code.
-     */
-    private int hash(int h) {
-        // Spread bits to regularize both segment and index locations,
-        // using variant of single-word Wang/Jenkins hash.
-        h += (h <<  15) ^ 0xffffcd7d;
-        h ^= (h >>> 10);
-        h += (h <<   3);
-        h ^= (h >>>  6);
-        h += (h <<   2) + (h << 14);
-        return h ^ (h >>> 16);
-    }
-
-    /**
-     * @param cmd Command.
-     * @return Service.
-     */
-    private <T> ExecutorService execForTask(T cmd) {
-        assert cmd != null;
-
-        //return execs[ThreadLocalRandom8.current().nextInt(DFLT_CONCUR_LVL)];
-        return execs[(hash(cmd.hashCode()) >>> segShift) & segMask];
+        throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index 4455b46..09cf7c8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -26,6 +26,7 @@ import javax.cache.processor.EntryProcessorResult;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.eviction.EvictableEntry;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -540,7 +541,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
         UUID subjId,
         String taskName,
         @Nullable CacheObject prevVal,
-        @Nullable Long updateCntr) throws IgniteCheckedException,
+        @Nullable Long updateCntr,
+        @Nullable GridDhtAtomicUpdateFuture fut) throws IgniteCheckedException,
         GridCacheEntryRemovedException {
         assert false;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest.java
new file mode 100644
index 0000000..62fd984
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+
+/**
+ *
+ */
+public class CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest
+    extends CacheContinuousQueryFailoverAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicWriteOrderMode writeOrderMode() {
+        return PRIMARY;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean asyncCallback() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest.java
new file mode 100644
index 0000000..4460498
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/**
+ *
+ */
+public class CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest extends CacheContinuousQueryFailoverTxSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return REPLICATED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean asyncCallback() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxSelfTest.java
new file mode 100644
index 0000000..8f0bd0e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxSelfTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/**
+ *
+ */
+public class CacheContinuousQueryAsyncFailoverTxSelfTest extends CacheContinuousQueryFailoverAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return TRANSACTIONAL;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean asyncCallback() {
+        return true;
+    }
+}


Mime
View raw message