ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shro...@apache.org
Subject [03/50] [abbrv] ignite git commit: IGNITE-2004 Fixed "Asynchronous execution of ContinuousQuery's remote filter & local list".
Date Tue, 01 Nov 2016 02:37:22 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java
new file mode 100644
index 0000000..dbaafe1
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java
@@ -0,0 +1,725 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+import javax.cache.configuration.Factory;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
+import javax.cache.event.CacheEntryCreatedListener;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryExpiredListener;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryRemovedListener;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.jetbrains.annotations.NotNull;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterRandomOperationTest.NonSerializableFilter.isAccepted;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.CLIENT;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.SERVER;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ *
+ */
+public class CacheContinuousQueryFactoryFilterRandomOperationTest extends CacheContinuousQueryRandomOperationsTest {
+    /** */
+    private static final int NODES = 5;
+
+    /** */
+    private static final int KEYS = 50;
+
+    /** */
+    private static final int VALS = 10;
+
+    /** */
+    public static final int ITERATION_CNT = 40;
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInternalQuery() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            1,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        final IgniteCache<Object, Object> cache = grid(0).createCache(ccfg);
+
+        UUID uuid = null;
+
+        try {
+            for (int i = 0; i < 10; i++)
+                cache.put(i, i);
+
+            final CountDownLatch latch = new CountDownLatch(5);
+
+            CacheEntryUpdatedListener lsnr = new CacheEntryUpdatedListener() {
+                @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
+                    for (Object evt : iterable) {
+                        latch.countDown();
+
+                        log.info("Received event: " + evt);
+                    }
+                }
+            };
+
+            uuid = grid(0).context().cache().cache(cache.getName()).context().continuousQueries()
+                .executeInternalQuery(lsnr, new SerializableFilter(), false, true, true);
+
+            for (int i = 10; i < 20; i++)
+                cache.put(i, i);
+
+            assertTrue(latch.await(3, SECONDS));
+        }
+        finally {
+            if (uuid != null)
+                grid(0).context().cache().cache(cache.getName()).context().continuousQueries()
+                    .cancelInternalQuery(uuid);
+
+            grid(0).destroyCache(ccfg.getName());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void doTestContinuousQuery(CacheConfiguration<Object, Object> ccfg, ContinuousDeploy deploy)
+        throws Exception {
+        ignite(0).createCache(ccfg);
+
+        try {
+            long seed = System.currentTimeMillis();
+
+            Random rnd = new Random(seed);
+
+            log.info("Random seed: " + seed);
+
+            List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues = new ArrayList<>();
+
+            Collection<QueryCursor<?>> curs = new ArrayList<>();
+
+            Collection<T2<Integer, MutableCacheEntryListenerConfiguration>> lsnrCfgs = new ArrayList<>();
+
+            if (deploy == CLIENT)
+                evtsQueues.add(registerListener(ccfg.getName(), NODES - 1, curs, lsnrCfgs, rnd.nextBoolean()));
+            else if (deploy == SERVER)
+                evtsQueues.add(registerListener(ccfg.getName(), rnd.nextInt(NODES - 1), curs, lsnrCfgs,
+                    rnd.nextBoolean()));
+            else {
+                boolean isSync = rnd.nextBoolean();
+
+                for (int i = 0; i < NODES - 1; i++)
+                    evtsQueues.add(registerListener(ccfg.getName(), i, curs, lsnrCfgs, isSync));
+            }
+
+            ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>();
+
+            Map<Integer, Long> partCntr = new ConcurrentHashMap<>();
+
+            try {
+                for (int i = 0; i < ITERATION_CNT; i++) {
+                    if (i % 10 == 0)
+                        log.info("Iteration: " + i);
+
+                    for (int idx = 0; idx < NODES; idx++)
+                        randomUpdate(rnd, evtsQueues, expData, partCntr, grid(idx).cache(ccfg.getName()));
+                }
+            }
+            finally {
+                for (QueryCursor<?> cur : curs)
+                    cur.close();
+
+                for (T2<Integer, MutableCacheEntryListenerConfiguration> e : lsnrCfgs)
+                    grid(e.get1()).cache(ccfg.getName()).deregisterCacheEntryListener(e.get2());
+            }
+        }
+        finally {
+            ignite(0).destroyCache(ccfg.getName());
+        }
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param nodeIdx Node index.
+     * @param curs Cursors.
+     * @param lsnrCfgs Listener configurations.
+     * @return Event queue
+     */
+    private BlockingQueue<CacheEntryEvent<?, ?>> registerListener(String cacheName,
+        int nodeIdx,
+        Collection<QueryCursor<?>> curs,
+        Collection<T2<Integer, MutableCacheEntryListenerConfiguration>> lsnrCfgs,
+        boolean sync) {
+        final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000);
+
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            MutableCacheEntryListenerConfiguration<QueryTestKey, QueryTestValue> lsnrCfg =
+                new MutableCacheEntryListenerConfiguration<>(
+                    FactoryBuilder.factoryOf(new LocalNonSerialiseListener() {
+                        @Override protected void onEvents(Iterable<CacheEntryEvent<? extends QueryTestKey,
+                            ? extends QueryTestValue>> evts) {
+                            for (CacheEntryEvent<?, ?> evt : evts)
+                                evtsQueue.add(evt);
+                        }
+                    }),
+                    createFilterFactory(),
+                    true,
+                    sync
+                );
+
+            grid(nodeIdx).cache(cacheName).registerCacheEntryListener((CacheEntryListenerConfiguration)lsnrCfg);
+
+            lsnrCfgs.add(new T2<Integer, MutableCacheEntryListenerConfiguration>(nodeIdx, lsnrCfg));
+        }
+        else {
+            ContinuousQuery<QueryTestKey, QueryTestValue> qry = new ContinuousQuery<>();
+
+            qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() {
+                @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey,
+                    ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+                    for (CacheEntryEvent<?, ?> evt : evts)
+                        evtsQueue.add(evt);
+                }
+            });
+
+            qry.setRemoteFilterFactory(createFilterFactory());
+
+            QueryCursor<?> cur = grid(nodeIdx).cache(cacheName).query(qry);
+
+            curs.add(cur);
+        }
+
+        return evtsQueue;
+    }
+
+    /**
+     * @return Filter factory.
+     */
+    @NotNull protected Factory<? extends CacheEntryEventFilter<QueryTestKey, QueryTestValue>> createFilterFactory() {
+        return new FilterFactory();
+    }
+
+    /**
+     * @param rnd Random generator.
+     * @param evtsQueues Events queue.
+     * @param expData Expected cache data.
+     * @param partCntr Partition counter.
+     * @param cache Cache.
+     * @throws Exception If failed.
+     */
+    private void randomUpdate(
+        Random rnd,
+        List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues,
+        ConcurrentMap<Object, Object> expData,
+        Map<Integer, Long> partCntr,
+        IgniteCache<Object, Object> cache)
+        throws Exception {
+        Object key = new QueryTestKey(rnd.nextInt(KEYS));
+        Object newVal = value(rnd);
+        Object oldVal = expData.get(key);
+
+        int op = rnd.nextInt(11);
+
+        Ignite ignite = cache.unwrap(Ignite.class);
+
+        Transaction tx = null;
+
+        if (cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL && rnd.nextBoolean())
+            tx = ignite.transactions().txStart(txRandomConcurrency(rnd), txRandomIsolation(rnd));
+
+        try {
+            // log.info("Random operation [key=" + key + ", op=" + op + ']');
+
+            switch (op) {
+                case 0: {
+                    cache.put(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+                    expData.put(key, newVal);
+
+                    break;
+                }
+
+                case 1: {
+                    cache.getAndPut(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+                    expData.put(key, newVal);
+
+                    break;
+                }
+
+                case 2: {
+                    cache.remove(key);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal);
+
+                    expData.remove(key);
+
+                    break;
+                }
+
+                case 3: {
+                    cache.getAndRemove(key);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal);
+
+                    expData.remove(key);
+
+                    break;
+                }
+
+                case 4: {
+                    cache.invoke(key, new EntrySetValueProcessor(newVal, rnd.nextBoolean()));
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+                    expData.put(key, newVal);
+
+                    break;
+                }
+
+                case 5: {
+                    cache.invoke(key, new EntrySetValueProcessor(null, rnd.nextBoolean()));
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal);
+
+                    expData.remove(key);
+
+                    break;
+                }
+
+                case 6: {
+                    cache.putIfAbsent(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal == null) {
+                        updatePartitionCounter(cache, key, partCntr);
+
+                        waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, null);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(evtsQueues);
+
+                    break;
+                }
+
+                case 7: {
+                    cache.getAndPutIfAbsent(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal == null) {
+                        updatePartitionCounter(cache, key, partCntr);
+
+                        waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, null);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(evtsQueues);
+
+                    break;
+                }
+
+                case 8: {
+                    cache.replace(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal != null) {
+                        updatePartitionCounter(cache, key, partCntr);
+
+                        waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(evtsQueues);
+
+                    break;
+                }
+
+                case 9: {
+                    cache.getAndReplace(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal != null) {
+                        updatePartitionCounter(cache, key, partCntr);
+
+                        waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(evtsQueues);
+
+                    break;
+                }
+
+                case 10: {
+                    if (oldVal != null) {
+                        Object replaceVal = value(rnd);
+
+                        boolean success = replaceVal.equals(oldVal);
+
+                        if (success) {
+                            cache.replace(key, replaceVal, newVal);
+
+                            if (tx != null)
+                                tx.commit();
+
+                            updatePartitionCounter(cache, key, partCntr);
+
+                            waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+                            expData.put(key, newVal);
+                        }
+                        else {
+                            cache.replace(key, replaceVal, newVal);
+
+                            if (tx != null)
+                                tx.commit();
+
+                            checkNoEvent(evtsQueues);
+                        }
+                    }
+                    else {
+                        cache.replace(key, value(rnd), newVal);
+
+                        if (tx != null)
+                            tx.commit();
+
+                        checkNoEvent(evtsQueues);
+                    }
+
+                    break;
+                }
+
+                default:
+                    fail("Op:" + op);
+            }
+        }
+        finally {
+            if (tx != null)
+                tx.close();
+        }
+    }
+
+    /**
+     * @param rnd {@link Random}.
+     * @return {@link TransactionIsolation}.
+     */
+    private TransactionIsolation txRandomIsolation(Random rnd) {
+        int val = rnd.nextInt(3);
+
+        if (val == 0)
+            return READ_COMMITTED;
+        else if (val == 1)
+            return REPEATABLE_READ;
+        else
+            return SERIALIZABLE;
+    }
+
+    /**
+     * @param rnd {@link Random}.
+     * @return {@link TransactionConcurrency}.
+     */
+    private TransactionConcurrency txRandomConcurrency(Random rnd) {
+        return rnd.nextBoolean() ? TransactionConcurrency.OPTIMISTIC : TransactionConcurrency.PESSIMISTIC;
+    }
+
+    /**
+     * @param cache Cache.
+     * @param key Key
+     * @param cntrs Partition counters.
+     */
+    private void updatePartitionCounter(IgniteCache<Object, Object> cache, Object key, Map<Integer, Long> cntrs) {
+        Affinity<Object> aff = cache.unwrap(Ignite.class).affinity(cache.getName());
+
+        int part = aff.partition(key);
+
+        Long partCntr = cntrs.get(part);
+
+        if (partCntr == null)
+            partCntr = 0L;
+
+        cntrs.put(part, ++partCntr);
+    }
+
+    /**
+     * @param rnd Random generator.
+     * @return Cache value.
+     */
+    private static Object value(Random rnd) {
+        return new QueryTestValue(rnd.nextInt(VALS));
+    }
+
+    /**
+     * @param evtsQueues Event queue.
+     * @param partCntrs Partition counters.
+     * @param aff Affinity function.
+     * @param key Key.
+     * @param val Value.
+     * @param oldVal Old value.
+     * @throws Exception If failed.
+     */
+    private void waitAndCheckEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues,
+        Map<Integer, Long> partCntrs,
+        Affinity<Object> aff,
+        Object key,
+        Object val,
+        Object oldVal)
+        throws Exception {
+        if ((val == null && oldVal == null
+            || (val != null && !isAccepted((QueryTestValue)val)))) {
+            checkNoEvent(evtsQueues);
+
+            return;
+        }
+
+        for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) {
+            CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS);
+
+            assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal=" + oldVal + ']', evt);
+            assertEquals(key, evt.getKey());
+            assertEquals(val, evt.getValue());
+            assertEquals(oldVal, evt.getOldValue());
+
+            long cntr = partCntrs.get(aff.partition(key));
+            CacheQueryEntryEvent qryEntryEvt = evt.unwrap(CacheQueryEntryEvent.class);
+
+            assertNotNull(cntr);
+            assertNotNull(qryEntryEvt);
+
+            assertEquals(cntr, qryEntryEvt.getPartitionUpdateCounter());
+        }
+    }
+
+    /**
+     * @param evtsQueues Event queue.
+     * @throws Exception If failed.
+     */
+    private void checkNoEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues) throws Exception {
+        for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) {
+            CacheEntryEvent<?, ?> evt = evtsQueue.poll(50, MILLISECONDS);
+
+            assertNull(evt);
+        }
+    }
+
+    /**
+     *
+     */
+    protected static class NonSerializableFilter
+        implements CacheEntryEventSerializableFilter<CacheContinuousQueryRandomOperationsTest.QueryTestKey,
+            CacheContinuousQueryRandomOperationsTest.QueryTestValue>, Externalizable {
+        /** */
+        public NonSerializableFilter() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> evt) {
+            return isAccepted(evt.getValue());
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            fail("Entry filter should not be marshaled.");
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            fail("Entry filter should not be marshaled.");
+        }
+
+        /**
+         * @param val Value.
+         * @return {@code True} if value is even.
+         */
+        public static boolean isAccepted(QueryTestValue val) {
+            return val == null || val.val1 % 2 == 0;
+        }
+    }
+
+    /**
+     *
+     */
+    protected static class SerializableFilter implements CacheEntryEventSerializableFilter<Integer, Integer> {
+        /** */
+        public SerializableFilter() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> evt)
+            throws CacheEntryListenerException {
+            return isAccepted(evt.getValue());
+        }
+
+        /**
+         * @return {@code True} if value is even.
+         */
+        public static boolean isAccepted(Integer val) {
+            return val == null || val % 2 == 0;
+        }
+    }
+
+    /**
+     *
+     */
+    protected static class FilterFactory implements Factory<NonSerializableFilter> {
+        /** {@inheritDoc} */
+        @Override public NonSerializableFilter create() {
+            return new NonSerializableFilter();
+        }
+    }
+
+    /**
+     *
+     */
+    public abstract class LocalNonSerialiseListener implements
+        CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>,
+        CacheEntryCreatedListener<QueryTestKey, QueryTestValue>,
+        CacheEntryExpiredListener<QueryTestKey, QueryTestValue>,
+        CacheEntryRemovedListener<QueryTestKey, QueryTestValue>,
+        Externalizable {
+        /** */
+        public LocalNonSerialiseListener() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onCreated(Iterable<CacheEntryEvent<? extends QueryTestKey,
+            ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+            onEvents(evts);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onExpired(Iterable<CacheEntryEvent<? extends QueryTestKey,
+            ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+            onEvents(evts);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onRemoved(Iterable<CacheEntryEvent<? extends QueryTestKey,
+            ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+            onEvents(evts);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey,
+            ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+            onEvents(evts);
+        }
+
+        /**
+         * @param evts Events.
+         */
+        protected abstract void onEvents(Iterable<CacheEntryEvent<? extends QueryTestKey,
+            ? extends QueryTestValue>> evts);
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            throw new UnsupportedOperationException("Failed. Listener should not be marshaled.");
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            throw new UnsupportedOperationException("Failed. Listener should not be unmarshaled.");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java
deleted file mode 100644
index 55340d5..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java
+++ /dev/null
@@ -1,714 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.query.continuous;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ThreadLocalRandom;
-import javax.cache.configuration.CacheEntryListenerConfiguration;
-import javax.cache.configuration.Factory;
-import javax.cache.configuration.FactoryBuilder;
-import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
-import javax.cache.event.CacheEntryCreatedListener;
-import javax.cache.event.CacheEntryEvent;
-import javax.cache.event.CacheEntryExpiredListener;
-import javax.cache.event.CacheEntryListenerException;
-import javax.cache.event.CacheEntryRemovedListener;
-import javax.cache.event.CacheEntryUpdatedListener;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
-import org.apache.ignite.cache.affinity.Affinity;
-import org.apache.ignite.cache.query.CacheQueryEntryEvent;
-import org.apache.ignite.cache.query.ContinuousQuery;
-import org.apache.ignite.cache.query.QueryCursor;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.internal.util.typedef.T2;
-import org.apache.ignite.transactions.Transaction;
-import org.apache.ignite.transactions.TransactionConcurrency;
-import org.apache.ignite.transactions.TransactionIsolation;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
-import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
-import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
-import static org.apache.ignite.cache.CacheMode.REPLICATED;
-import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterTest.NonSerializableFilter.isAccepted;
-import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.CLIENT;
-import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.SERVER;
-import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
-import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
-import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
-
-/**
- *
- */
-public class CacheContinuousQueryFactoryFilterTest extends CacheContinuousQueryRandomOperationsTest {
-    /** */
-    private static final int NODES = 5;
-
-    /** */
-    private static final int KEYS = 50;
-
-    /** */
-    private static final int VALS = 10;
-
-    /** */
-    public static final int ITERATION_CNT = 40;
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testInternalQuery() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
-            1,
-            ATOMIC,
-            ONHEAP_TIERED,
-            false);
-
-        final IgniteCache<Object, Object> cache = grid(0).createCache(ccfg);
-
-        UUID uuid = null;
-
-        try {
-            for (int i = 0; i < 10; i++)
-                cache.put(i, i);
-
-            final CountDownLatch latch = new CountDownLatch(5);
-
-            CacheEntryUpdatedListener lsnr = new CacheEntryUpdatedListener() {
-                @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
-                    for (Object evt : iterable) {
-                        latch.countDown();
-
-                        log.info("Received event: " + evt);
-                    }
-                }
-            };
-
-            uuid = grid(0).context().cache().cache(cache.getName()).context().continuousQueries()
-                .executeInternalQuery(lsnr, new SerializableFilter(), false, true, true);
-
-            for (int i = 10; i < 20; i++)
-                cache.put(i, i);
-
-            assertTrue(latch.await(3, SECONDS));
-        }
-        finally {
-            if (uuid != null)
-                grid(0).context().cache().cache(cache.getName()).context().continuousQueries()
-                    .cancelInternalQuery(uuid);
-
-            grid(0).destroyCache(ccfg.getName());
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void doTestContinuousQuery(CacheConfiguration<Object, Object> ccfg, ContinuousDeploy deploy)
-        throws Exception {
-        ignite(0).createCache(ccfg);
-
-        try {
-            long seed = System.currentTimeMillis();
-
-            Random rnd = new Random(seed);
-
-            log.info("Random seed: " + seed);
-
-            List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues = new ArrayList<>();
-
-            Collection<QueryCursor<?>> curs = new ArrayList<>();
-
-            Collection<T2<Integer, MutableCacheEntryListenerConfiguration>> lsnrCfgs = new ArrayList<>();
-
-            if (deploy == CLIENT)
-                evtsQueues.add(registerListener(ccfg.getName(), NODES - 1, curs, lsnrCfgs, rnd.nextBoolean()));
-            else if (deploy == SERVER)
-                evtsQueues.add(registerListener(ccfg.getName(), rnd.nextInt(NODES - 1), curs, lsnrCfgs,
-                    rnd.nextBoolean()));
-            else {
-                boolean isSync = rnd.nextBoolean();
-
-                for (int i = 0; i < NODES - 1; i++)
-                    evtsQueues.add(registerListener(ccfg.getName(), i, curs, lsnrCfgs, isSync));
-            }
-
-            ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>();
-
-            Map<Integer, Long> partCntr = new ConcurrentHashMap<>();
-
-            try {
-                for (int i = 0; i < ITERATION_CNT; i++) {
-                    if (i % 10 == 0)
-                        log.info("Iteration: " + i);
-
-                    for (int idx = 0; idx < NODES; idx++)
-                        randomUpdate(rnd, evtsQueues, expData, partCntr, grid(idx).cache(ccfg.getName()));
-                }
-            }
-            finally {
-                for (QueryCursor<?> cur : curs)
-                    cur.close();
-
-                for (T2<Integer, MutableCacheEntryListenerConfiguration> e : lsnrCfgs)
-                    grid(e.get1()).cache(ccfg.getName()).deregisterCacheEntryListener(e.get2());
-            }
-        }
-        finally {
-            ignite(0).destroyCache(ccfg.getName());
-        }
-    }
-
-    /**
-     * @param cacheName Cache name.
-     * @param nodeIdx Node index.
-     * @param curs Cursors.
-     * @param lsnrCfgs Listener configurations.
-     * @return Event queue
-     */
-    private BlockingQueue<CacheEntryEvent<?, ?>> registerListener(String cacheName,
-        int nodeIdx,
-        Collection<QueryCursor<?>> curs,
-        Collection<T2<Integer, MutableCacheEntryListenerConfiguration>> lsnrCfgs,
-        boolean sync) {
-        final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000);
-
-        if (ThreadLocalRandom.current().nextBoolean()) {
-            MutableCacheEntryListenerConfiguration<QueryTestKey, QueryTestValue> lsnrCfg =
-                new MutableCacheEntryListenerConfiguration<>(
-                    FactoryBuilder.factoryOf(new LocalNonSerialiseListener() {
-                        @Override protected void onEvents(Iterable<CacheEntryEvent<? extends QueryTestKey,
-                            ? extends QueryTestValue>> evts) {
-                            for (CacheEntryEvent<?, ?> evt : evts)
-                                evtsQueue.add(evt);
-                        }
-                    }),
-                    new FilterFactory(),
-                    true,
-                    sync
-                );
-
-            grid(nodeIdx).cache(cacheName).registerCacheEntryListener((CacheEntryListenerConfiguration)lsnrCfg);
-
-            lsnrCfgs.add(new T2<Integer, MutableCacheEntryListenerConfiguration>(nodeIdx, lsnrCfg));
-        }
-        else {
-            ContinuousQuery<QueryTestKey, QueryTestValue> qry = new ContinuousQuery<>();
-
-            qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() {
-                @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey,
-                    ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
-                    for (CacheEntryEvent<?, ?> evt : evts)
-                        evtsQueue.add(evt);
-                }
-            });
-
-            qry.setRemoteFilterFactory(new FilterFactory());
-
-            QueryCursor<?> cur = grid(nodeIdx).cache(cacheName).query(qry);
-
-            curs.add(cur);
-        }
-
-        return evtsQueue;
-    }
-
-    /**
-     * @param rnd Random generator.
-     * @param evtsQueues Events queue.
-     * @param expData Expected cache data.
-     * @param partCntr Partition counter.
-     * @param cache Cache.
-     * @throws Exception If failed.
-     */
-    private void randomUpdate(
-        Random rnd,
-        List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues,
-        ConcurrentMap<Object, Object> expData,
-        Map<Integer, Long> partCntr,
-        IgniteCache<Object, Object> cache)
-        throws Exception {
-        Object key = new QueryTestKey(rnd.nextInt(KEYS));
-        Object newVal = value(rnd);
-        Object oldVal = expData.get(key);
-
-        int op = rnd.nextInt(11);
-
-        Ignite ignite = cache.unwrap(Ignite.class);
-
-        Transaction tx = null;
-
-        if (cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL && rnd.nextBoolean())
-            tx = ignite.transactions().txStart(txRandomConcurrency(rnd), txRandomIsolation(rnd));
-
-        try {
-            // log.info("Random operation [key=" + key + ", op=" + op + ']');
-
-            switch (op) {
-                case 0: {
-                    cache.put(key, newVal);
-
-                    if (tx != null)
-                        tx.commit();
-
-                    updatePartitionCounter(cache, key, partCntr);
-
-                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
-
-                    expData.put(key, newVal);
-
-                    break;
-                }
-
-                case 1: {
-                    cache.getAndPut(key, newVal);
-
-                    if (tx != null)
-                        tx.commit();
-
-                    updatePartitionCounter(cache, key, partCntr);
-
-                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
-
-                    expData.put(key, newVal);
-
-                    break;
-                }
-
-                case 2: {
-                    cache.remove(key);
-
-                    if (tx != null)
-                        tx.commit();
-
-                    updatePartitionCounter(cache, key, partCntr);
-
-                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal);
-
-                    expData.remove(key);
-
-                    break;
-                }
-
-                case 3: {
-                    cache.getAndRemove(key);
-
-                    if (tx != null)
-                        tx.commit();
-
-                    updatePartitionCounter(cache, key, partCntr);
-
-                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal);
-
-                    expData.remove(key);
-
-                    break;
-                }
-
-                case 4: {
-                    cache.invoke(key, new EntrySetValueProcessor(newVal, rnd.nextBoolean()));
-
-                    if (tx != null)
-                        tx.commit();
-
-                    updatePartitionCounter(cache, key, partCntr);
-
-                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
-
-                    expData.put(key, newVal);
-
-                    break;
-                }
-
-                case 5: {
-                    cache.invoke(key, new EntrySetValueProcessor(null, rnd.nextBoolean()));
-
-                    if (tx != null)
-                        tx.commit();
-
-                    updatePartitionCounter(cache, key, partCntr);
-
-                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal);
-
-                    expData.remove(key);
-
-                    break;
-                }
-
-                case 6: {
-                    cache.putIfAbsent(key, newVal);
-
-                    if (tx != null)
-                        tx.commit();
-
-                    if (oldVal == null) {
-                        updatePartitionCounter(cache, key, partCntr);
-
-                        waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, null);
-
-                        expData.put(key, newVal);
-                    }
-                    else
-                        checkNoEvent(evtsQueues);
-
-                    break;
-                }
-
-                case 7: {
-                    cache.getAndPutIfAbsent(key, newVal);
-
-                    if (tx != null)
-                        tx.commit();
-
-                    if (oldVal == null) {
-                        updatePartitionCounter(cache, key, partCntr);
-
-                        waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, null);
-
-                        expData.put(key, newVal);
-                    }
-                    else
-                        checkNoEvent(evtsQueues);
-
-                    break;
-                }
-
-                case 8: {
-                    cache.replace(key, newVal);
-
-                    if (tx != null)
-                        tx.commit();
-
-                    if (oldVal != null) {
-                        updatePartitionCounter(cache, key, partCntr);
-
-                        waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
-
-                        expData.put(key, newVal);
-                    }
-                    else
-                        checkNoEvent(evtsQueues);
-
-                    break;
-                }
-
-                case 9: {
-                    cache.getAndReplace(key, newVal);
-
-                    if (tx != null)
-                        tx.commit();
-
-                    if (oldVal != null) {
-                        updatePartitionCounter(cache, key, partCntr);
-
-                        waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
-
-                        expData.put(key, newVal);
-                    }
-                    else
-                        checkNoEvent(evtsQueues);
-
-                    break;
-                }
-
-                case 10: {
-                    if (oldVal != null) {
-                        Object replaceVal = value(rnd);
-
-                        boolean success = replaceVal.equals(oldVal);
-
-                        if (success) {
-                            cache.replace(key, replaceVal, newVal);
-
-                            if (tx != null)
-                                tx.commit();
-
-                            updatePartitionCounter(cache, key, partCntr);
-
-                            waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
-
-                            expData.put(key, newVal);
-                        }
-                        else {
-                            cache.replace(key, replaceVal, newVal);
-
-                            if (tx != null)
-                                tx.commit();
-
-                            checkNoEvent(evtsQueues);
-                        }
-                    }
-                    else {
-                        cache.replace(key, value(rnd), newVal);
-
-                        if (tx != null)
-                            tx.commit();
-
-                        checkNoEvent(evtsQueues);
-                    }
-
-                    break;
-                }
-
-                default:
-                    fail("Op:" + op);
-            }
-        } finally {
-            if (tx != null)
-                tx.close();
-        }
-    }
-
-    /**
-     * @param rnd {@link Random}.
-     * @return {@link TransactionIsolation}.
-     */
-    private TransactionIsolation txRandomIsolation(Random rnd) {
-        int val = rnd.nextInt(3);
-
-        if (val == 0)
-            return READ_COMMITTED;
-        else if (val == 1)
-            return REPEATABLE_READ;
-        else
-            return SERIALIZABLE;
-    }
-
-    /**
-     * @param rnd {@link Random}.
-     * @return {@link TransactionConcurrency}.
-     */
-    private TransactionConcurrency txRandomConcurrency(Random rnd) {
-        return rnd.nextBoolean() ? TransactionConcurrency.OPTIMISTIC : TransactionConcurrency.PESSIMISTIC;
-    }
-
-    /**
-     * @param cache Cache.
-     * @param key Key
-     * @param cntrs Partition counters.
-     */
-    private void updatePartitionCounter(IgniteCache<Object, Object> cache, Object key, Map<Integer, Long> cntrs) {
-        Affinity<Object> aff = cache.unwrap(Ignite.class).affinity(cache.getName());
-
-        int part = aff.partition(key);
-
-        Long partCntr = cntrs.get(part);
-
-        if (partCntr == null)
-            partCntr = 0L;
-
-        cntrs.put(part, ++partCntr);
-    }
-
-    /**
-     * @param rnd Random generator.
-     * @return Cache value.
-     */
-    private static Object value(Random rnd) {
-        return new QueryTestValue(rnd.nextInt(VALS));
-    }
-
-    /**
-     * @param evtsQueues Event queue.
-     * @param partCntrs Partition counters.
-     * @param aff Affinity function.
-     * @param key Key.
-     * @param val Value.
-     * @param oldVal Old value.
-     * @throws Exception If failed.
-     */
-    private void waitAndCheckEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues,
-        Map<Integer, Long> partCntrs,
-        Affinity<Object> aff,
-        Object key,
-        Object val,
-        Object oldVal)
-        throws Exception {
-        if ((val == null && oldVal == null
-            || (val != null && !isAccepted((QueryTestValue)val)))) {
-            checkNoEvent(evtsQueues);
-
-            return;
-        }
-
-        for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) {
-            CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS);
-
-            assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal=" + oldVal + ']', evt);
-            assertEquals(key, evt.getKey());
-            assertEquals(val, evt.getValue());
-            assertEquals(oldVal, evt.getOldValue());
-
-            long cntr = partCntrs.get(aff.partition(key));
-            CacheQueryEntryEvent qryEntryEvt = evt.unwrap(CacheQueryEntryEvent.class);
-
-            assertNotNull(cntr);
-            assertNotNull(qryEntryEvt);
-
-            assertEquals(cntr, qryEntryEvt.getPartitionUpdateCounter());
-        }
-    }
-
-    /**
-     * @param evtsQueues Event queue.
-     * @throws Exception If failed.
-     */
-    private void checkNoEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues) throws Exception {
-        for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) {
-            CacheEntryEvent<?, ?> evt = evtsQueue.poll(50, MILLISECONDS);
-
-            assertNull(evt);
-        }
-    }
-
-    /**
-     *
-     */
-    protected static class NonSerializableFilter
-        implements CacheEntryEventSerializableFilter<CacheContinuousQueryRandomOperationsTest.QueryTestKey,
-            CacheContinuousQueryRandomOperationsTest.QueryTestValue>, Externalizable {
-        /** */
-        public NonSerializableFilter() {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> event)
-            throws CacheEntryListenerException {
-            return isAccepted(event.getValue());
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            fail("Entry filter should not be marshaled.");
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            fail("Entry filter should not be marshaled.");
-        }
-
-        /**
-         * @return {@code True} if value is even.
-         */
-        public static boolean isAccepted(QueryTestValue val) {
-            return val == null || val.val1 % 2 == 0;
-        }
-    }
-
-    /**
-     *
-     */
-    protected static class SerializableFilter implements CacheEntryEventSerializableFilter<Integer, Integer>{
-        /** */
-        public SerializableFilter() {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> event)
-            throws CacheEntryListenerException {
-            return isAccepted(event.getValue());
-        }
-
-        /**
-         * @return {@code True} if value is even.
-         */
-        public static boolean isAccepted(Integer val) {
-            return val == null || val % 2 == 0;
-        }
-    }
-
-    /**
-     *
-     */
-    protected static class FilterFactory implements Factory<NonSerializableFilter> {
-        @Override public NonSerializableFilter create() {
-            return new NonSerializableFilter();
-        }
-    }
-
-    /**
-     *
-     */
-    public abstract class LocalNonSerialiseListener implements
-        CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>,
-        CacheEntryCreatedListener<QueryTestKey, QueryTestValue>,
-        CacheEntryExpiredListener<QueryTestKey, QueryTestValue>,
-        CacheEntryRemovedListener<QueryTestKey, QueryTestValue>,
-        Externalizable {
-        /** */
-        public LocalNonSerialiseListener() {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onCreated(Iterable<CacheEntryEvent<? extends QueryTestKey,
-            ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
-            onEvents(evts);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onExpired(Iterable<CacheEntryEvent<? extends QueryTestKey,
-            ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
-            onEvents(evts);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onRemoved(Iterable<CacheEntryEvent<? extends QueryTestKey,
-            ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
-            onEvents(evts);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey,
-            ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
-            onEvents(evts);
-        }
-
-        /**
-         * @param evts Events.
-         */
-        protected abstract void onEvents(Iterable<CacheEntryEvent<? extends QueryTestKey,
-            ? extends QueryTestValue>> evts);
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            throw new UnsupportedOperationException("Failed. Listener should not be marshaled.");
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            throw new UnsupportedOperationException("Failed. Listener should not be unmarshaled.");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index 4226537..083367c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -81,6 +81,7 @@ import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.internal.util.typedef.PAX;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.lang.IgniteAsyncCallback;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -99,7 +100,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.MINUTES;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
-import static org.apache.ignite.cache.CacheMemoryMode.*;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
@@ -167,6 +168,13 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
     }
 
     /**
+     * @return Async callback flag.
+     */
+    protected boolean asyncCallback() {
+        return false;
+    }
+
+    /**
      * @return Near cache configuration.
      */
     protected NearCacheConfiguration nearCacheConfiguration() {
@@ -476,7 +484,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
         for (int j = 0; j < 50; ++j) {
             ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
 
-            final CacheEventListener3 lsnr = new CacheEventListener3();
+            final CacheEventListener3 lsnr = asyncCallback() ? new CacheEventAsyncListener3()
+                : new CacheEventListener3();
 
             qry.setLocalListener(lsnr);
 
@@ -560,7 +569,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
 
         ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
 
-        final CacheEventListener3 lsnr = new CacheEventListener3();
+        final CacheEventListener3 lsnr = asyncCallback() ? new CacheEventAsyncListener3() : new CacheEventListener3();
 
         qry.setLocalListener(lsnr);
 
@@ -721,7 +730,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
 
         ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
 
-        final CacheEventListener3 lsnr = new CacheEventListener3();
+        final CacheEventListener3 lsnr = asyncCallback() ? new CacheEventAsyncListener3() : new CacheEventListener3();
 
         qry.setLocalListener(lsnr);
 
@@ -841,7 +850,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
 
         Affinity<Object> aff = qryClient.affinity(null);
 
-        CacheEventListener1 lsnr = new CacheEventListener1(false);
+        CacheEventListener1 lsnr = asyncCallback() ? new CacheEventAsyncListener1(false)
+            : new CacheEventListener1(false);
 
         ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
 
@@ -1545,7 +1555,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
 
         qry.setLocalListener(lsnr);
 
-        qry.setRemoteFilter(new CacheEventFilter());
+        qry.setRemoteFilter(asyncCallback() ? new CacheEventAsyncFilter() : new CacheEventFilter());
 
         QueryCursor<?> cur = qryClnCache.query(qry);
 
@@ -1639,7 +1649,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
 
                     newQry.setLocalListener(dinLsnr);
 
-                    newQry.setRemoteFilter(new CacheEventFilter());
+                    newQry.setRemoteFilter(asyncCallback() ? new CacheEventAsyncFilter() : new CacheEventFilter());
 
                     dinQry = qryClnCache.query(newQry);
 
@@ -1786,7 +1796,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
 
         final IgniteCache<Object, Object> qryClnCache = qryCln.cache(null);
 
-        final CacheEventListener2 lsnr = new CacheEventListener2();
+        final CacheEventListener2 lsnr = asyncCallback() ? new CacheEventAsyncListener2() : new CacheEventListener2();
 
         ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
 
@@ -2144,6 +2154,19 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
     /**
      *
      */
+    @IgniteAsyncCallback
+    private static class CacheEventAsyncListener1 extends CacheEventListener1 {
+        /**
+         * @param saveAll Save all events flag.
+         */
+        CacheEventAsyncListener1(boolean saveAll) {
+            super(saveAll);
+        }
+    }
+
+    /**
+     *
+     */
     private static class CacheEventListener1 implements CacheEntryUpdatedListener<Object, Object> {
         /** */
         private volatile CountDownLatch latch;
@@ -2208,6 +2231,14 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
     /**
      *
      */
+    @IgniteAsyncCallback
+    private static class CacheEventAsyncListener2 extends CacheEventListener2 {
+        // No-op.
+    }
+
+    /**
+     *
+     */
     private static class CacheEventListener2 implements CacheEntryUpdatedListener<Object, Object> {
         /** */
         @LoggerResource
@@ -2275,6 +2306,14 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
     /**
      *
      */
+    @IgniteAsyncCallback
+    public static class CacheEventAsyncListener3 extends CacheEventListener3 {
+        // No-op.
+    }
+
+    /**
+     *
+     */
     public static class CacheEventListener3 implements CacheEntryUpdatedListener<Object, Object>,
         CacheEntryEventSerializableFilter<Object, Object> {
         /** Keys. */
@@ -2303,6 +2342,14 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
     /**
      *
      */
+    @IgniteAsyncCallback
+    private static class CacheEventAsyncFilter extends CacheEventFilter {
+        // No-op.
+    }
+
+    /**
+     *
+     */
     public static class CacheEventFilter implements CacheEntryEventSerializableFilter<Object, Object> {
         /** {@inheritDoc} */
         @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) throws CacheEntryListenerException {

http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java
index 025dd80..b469a86 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java
@@ -60,6 +60,12 @@ public class CacheContinuousQueryLostPartitionTest extends GridCommonAbstractTes
         super.beforeTest();
 
         startGridsMultiThreaded(2);
+
+        assert GridTestUtils.waitForCondition(new PA() {
+            @Override public boolean apply() {
+                return grid(0).cluster().nodes().size() == 2;
+            }
+        }, 10000L);
     }
 
     /** {@inheritDoc} */
@@ -140,6 +146,14 @@ public class CacheContinuousQueryLostPartitionTest extends GridCommonAbstractTes
         // node2 now becomes the primary for the key.
         stopGrid(0);
 
+        final int prevSize = grid(1).cluster().nodes().size();
+
+        GridTestUtils.waitForCondition(new PA() {
+            @Override public boolean apply() {
+                return prevSize - 1 == grid(1).cluster().nodes().size();
+            }
+        }, 5000L);
+
         cache2.put(key, "2");
 
         // Sanity check.

http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java
new file mode 100644
index 0000000..0d027a9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java
@@ -0,0 +1,627 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteAsyncCallback;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+
+/**
+ *
+ */
+public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbstractTest {
+    /** */
+    public static final int KEYS = 10;
+
+    /** */
+    public static final int KEYS_FROM_CALLBACK = 20;
+
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int NODES = 5;
+
+    /** */
+    public static final int ITERATION_CNT = 20;
+
+    /** */
+    public static final int SYSTEM_POOL_SIZE = 10;
+
+    /** */
+    private boolean client;
+
+    /** */
+    private static AtomicInteger filterCbCntr = new AtomicInteger(0);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setSystemThreadPoolSize(SYSTEM_POOL_SIZE);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
+        cfg.setClientMode(client);
+
+        MemoryEventStorageSpi storeSpi = new MemoryEventStorageSpi();
+        storeSpi.setExpireCount(100);
+
+        cfg.setEventStorageSpi(storeSpi);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(NODES - 1);
+
+        client = true;
+
+        startGrid(NODES - 1);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        filterCbCntr.set(0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicTwoBackups() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, ATOMIC, FULL_SYNC);
+
+        doTest(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxTwoBackupsFilter() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, FULL_SYNC);
+
+        doTest(ccfg, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxTwoBackupsFilterPrimary() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, PRIMARY_SYNC);
+
+        doTest(ccfg, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxReplicatedFilter() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0, TRANSACTIONAL, FULL_SYNC);
+
+        doTest(ccfg, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxTwoBackup() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, FULL_SYNC);
+
+        doTest(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxReplicated() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, FULL_SYNC);
+
+        doTest(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxReplicatedPrimary() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, PRIMARY_SYNC);
+
+        doTest(ccfg, true);
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @throws Exception If failed.
+     */
+    protected void doTest(final CacheConfiguration ccfg, boolean fromLsnr) throws Exception {
+        ignite(0).createCache(ccfg);
+
+        List<QueryCursor<?>> qries = new ArrayList<>();
+
+        assertEquals(0, filterCbCntr.get());
+
+        try {
+            List<Set<T2<QueryTestKey, QueryTestValue>>> rcvdEvts = new ArrayList<>(NODES);
+            List<Set<T2<QueryTestKey, QueryTestValue>>> evtsFromCallbacks = new ArrayList<>(NODES);
+
+            final AtomicInteger qryCntr = new AtomicInteger(0);
+
+            final AtomicInteger cbCntr = new AtomicInteger(0);
+
+            final int threadCnt = SYSTEM_POOL_SIZE * 2;
+
+            for (int idx = 0; idx < NODES; idx++) {
+                Set<T2<QueryTestKey, QueryTestValue>> evts = Collections.
+                    newSetFromMap(new ConcurrentHashMap<T2<QueryTestKey, QueryTestValue>, Boolean>());
+                Set<T2<QueryTestKey, QueryTestValue>> evtsFromCb = Collections.
+                    newSetFromMap(new ConcurrentHashMap<T2<QueryTestKey, QueryTestValue>, Boolean>());
+
+                IgniteCache<Object, Object> cache = grid(idx).getOrCreateCache(ccfg.getName());
+
+                ContinuousQuery qry = new ContinuousQuery();
+
+                qry.setLocalListener(new TestCacheAsyncEventListener(evts, evtsFromCb,
+                    fromLsnr ? cache : null, qryCntr, cbCntr));
+
+                if (!fromLsnr)
+                    qry.setRemoteFilterFactory(
+                        FactoryBuilder.factoryOf(new CacheTestRemoteFilterAsync(ccfg.getName())));
+
+                rcvdEvts.add(evts);
+                evtsFromCallbacks.add(evtsFromCb);
+
+                QueryCursor qryCursor = cache.query(qry);
+
+                qries.add(qryCursor);
+            }
+
+            IgniteInternalFuture<Long> f = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+                @Override public void run() {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    for (int i = 0; i < ITERATION_CNT; i++) {
+                        IgniteCache<QueryTestKey, QueryTestValue> cache =
+                            grid(rnd.nextInt(NODES)).cache(ccfg.getName());
+
+                        QueryTestKey key = new QueryTestKey(rnd.nextInt(KEYS));
+
+                        boolean startTx = cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() ==
+                            TRANSACTIONAL && rnd.nextBoolean();
+
+                        Transaction tx = null;
+
+                        if (startTx)
+                            tx = cache.unwrap(Ignite.class).transactions().txStart();
+
+                        try {
+                            if ((cache.get(key) == null) || rnd.nextBoolean())
+                                cache.invoke(key, new IncrementTestEntryProcessor());
+                            else {
+                                QueryTestValue val;
+                                QueryTestValue newVal;
+
+                                do {
+                                    val = cache.get(key);
+
+                                    newVal = val == null ?
+                                        new QueryTestValue(0) : new QueryTestValue(val.val1 + 1);
+                                }
+                                while (!cache.replace(key, val, newVal));
+                            }
+                        }
+                        finally {
+                            if (tx != null)
+                                tx.commit();
+                        }
+                    }
+                }
+            }, threadCnt, "put-thread");
+
+            f.get(30, TimeUnit.SECONDS);
+
+            assert GridTestUtils.waitForCondition(new PA() {
+                @Override public boolean apply() {
+                    return qryCntr.get() >= ITERATION_CNT * threadCnt * NODES;
+                }
+            }, TimeUnit.MINUTES.toMillis(2));
+
+            for (Set<T2<QueryTestKey, QueryTestValue>> set : rcvdEvts)
+                checkEvents(set, ITERATION_CNT * threadCnt, grid(0).cache(ccfg.getName()), false);
+
+            if (fromLsnr) {
+                final int expCnt = qryCntr.get() * NODES * KEYS_FROM_CALLBACK;
+
+                boolean res = GridTestUtils.waitForCondition(new PA() {
+                    @Override public boolean apply() {
+                        return cbCntr.get() >= expCnt;
+                    }
+                }, TimeUnit.SECONDS.toMillis(60));
+
+                assertTrue("Failed to wait events [exp=" + expCnt + ", act=" + cbCntr.get() + "]", res);
+
+                assertEquals(expCnt, cbCntr.get());
+
+                for (Set<T2<QueryTestKey, QueryTestValue>> set : evtsFromCallbacks)
+                    checkEvents(set, qryCntr.get() * KEYS_FROM_CALLBACK, grid(0).cache(ccfg.getName()), true);
+            }
+            else {
+                final int expInvkCnt = ITERATION_CNT * threadCnt *
+                    (ccfg.getCacheMode() != REPLICATED ? (ccfg.getBackups() + 1) : NODES - 1) * NODES;
+
+                GridTestUtils.waitForCondition(new PA() {
+                    @Override public boolean apply() {
+                        return filterCbCntr.get() >= expInvkCnt;
+                    }
+                }, TimeUnit.SECONDS.toMillis(60));
+
+                assertEquals(expInvkCnt, filterCbCntr.get());
+
+                for (Set<T2<QueryTestKey, QueryTestValue>> set : evtsFromCallbacks)
+                    checkEvents(set, expInvkCnt * KEYS_FROM_CALLBACK, grid(0).cache(ccfg.getName()), true);
+            }
+        }
+        finally {
+            for (QueryCursor<?> qry : qries)
+                qry.close();
+
+            ignite(0).destroyCache(ccfg.getName());
+        }
+    }
+
+    /**
+     * @param expCnt Expected count.
+     * @param cache Cache.
+     * @param set Received events.
+     * @throws Exception If failed.
+     */
+    private void checkEvents(final Set<T2<QueryTestKey, QueryTestValue>> set, final int expCnt, IgniteCache cache,
+        boolean cb) throws Exception {
+        assertTrue("Expected size: " + expCnt + ", actual: " + set.size(), GridTestUtils.waitForCondition(new PA() {
+            @Override public boolean apply() {
+                return set.size() >= expCnt;
+            }
+        }, 10000L));
+
+        int startKey = cb ? KEYS : 0;
+        int endKey = cb ? KEYS + KEYS_FROM_CALLBACK : KEYS;
+
+        for (int i = startKey; i < endKey; i++) {
+            QueryTestKey key = new QueryTestKey(i);
+
+            QueryTestValue maxVal = (QueryTestValue)cache.get(key);
+
+            for (int val = 0; val <= maxVal.val1; val++)
+                assertTrue(set.remove(new T2<>(key, new QueryTestValue(val))));
+        }
+
+        assertTrue(set.isEmpty());
+    }
+
+    /**
+     *
+     */
+    private static class IncrementTestEntryProcessor implements
+        CacheEntryProcessor<QueryTestKey, QueryTestValue, Object> {
+        /** {@inheritDoc} */
+        @Override public Object process(MutableEntry<QueryTestKey, QueryTestValue> entry, Object... arguments)
+            throws EntryProcessorException {
+            if (entry.exists())
+                entry.setValue(new QueryTestValue(entry.getValue().val1 + 1));
+            else
+                entry.setValue(new QueryTestValue(0));
+
+            return null;
+        }
+    }
+
+    /**
+     *
+     */
+    @IgniteAsyncCallback
+    private static class CacheTestRemoteFilterAsync implements
+        CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue> {
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** */
+        private String cacheName;
+
+        /**
+         * @param cacheName Cache name.
+         */
+        public CacheTestRemoteFilterAsync(String cacheName) {
+            this.cacheName = cacheName;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e)
+            throws CacheEntryListenerException {
+            if (e.getKey().compareTo(new QueryTestKey(KEYS)) < 0) {
+                IgniteCache<QueryTestKey, QueryTestValue> cache = ignite.cache(cacheName);
+
+                if (ThreadLocalRandom.current().nextBoolean()) {
+                    Set<QueryTestKey> keys = new LinkedHashSet<>();
+
+                    for (int key = KEYS; key < KEYS + KEYS_FROM_CALLBACK; key++)
+                        keys.add(new QueryTestKey(key));
+
+                    cache.invokeAll(keys, new IncrementTestEntryProcessor());
+                }
+                else {
+                    for (int key = KEYS; key < KEYS + KEYS_FROM_CALLBACK; key++)
+                        cache.invoke(new QueryTestKey(key), new IncrementTestEntryProcessor());
+                }
+
+                filterCbCntr.incrementAndGet();
+            }
+
+            return true;
+        }
+    }
+
+    /**
+     *
+     */
+    @IgniteAsyncCallback
+    private static class TestCacheAsyncEventListener
+        implements CacheEntryUpdatedListener<QueryTestKey, QueryTestValue> {
+        /** */
+        private final Set<T2<QueryTestKey, QueryTestValue>> rcvsEvts;
+
+        /** */
+        private final AtomicInteger cntr;
+
+        /** */
+        private final AtomicInteger cbCntr;
+
+        /** */
+        private final Set<T2<QueryTestKey, QueryTestValue>> evtsFromCb;
+
+        /** */
+        private IgniteCache<QueryTestKey, QueryTestValue> cache;
+
+        /**
+         * @param rcvsEvts Set for received events.
+         * @param evtsFromCb Set for received events.
+         * @param cache Ignite cache.
+         * @param cntr Received events counter.
+         * @param cbCntr Received events counter from callbacks.
+         */
+        public TestCacheAsyncEventListener(Set<T2<QueryTestKey, QueryTestValue>> rcvsEvts,
+            Set<T2<QueryTestKey, QueryTestValue>> evtsFromCb,
+            @Nullable IgniteCache cache,
+            AtomicInteger cntr,
+            AtomicInteger cbCntr) {
+            this.rcvsEvts = rcvsEvts;
+            this.evtsFromCb = evtsFromCb;
+            this.cache = cache;
+            this.cntr = cntr;
+            this.cbCntr = cbCntr;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> evts)
+            throws CacheEntryListenerException {
+            for (CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e : evts) {
+                if (e.getKey().compareTo(new QueryTestKey(KEYS)) < 0) {
+                    rcvsEvts.add(new T2<>(e.getKey(), e.getValue()));
+
+                    cntr.incrementAndGet();
+
+                    if (cache != null) {
+                        if (ThreadLocalRandom.current().nextBoolean()) {
+                            Set<QueryTestKey> keys = new LinkedHashSet<>();
+
+                            for (int key = KEYS; key < KEYS + KEYS_FROM_CALLBACK; key++)
+                                keys.add(new QueryTestKey(key));
+
+                            cache.invokeAll(keys, new IncrementTestEntryProcessor());
+                        }
+                        else {
+                            for (int key = KEYS; key < KEYS + KEYS_FROM_CALLBACK; key++)
+                                cache.invoke(new QueryTestKey(key), new IncrementTestEntryProcessor());
+                        }
+                    }
+                }
+                else {
+                    evtsFromCb.add(new T2<>(e.getKey(), e.getValue()));
+
+                    cbCntr.incrementAndGet();
+                }
+            }
+        }
+    }
+
+    /**
+     * @param cacheMode Cache mode.
+     * @param backups Number of backups.
+     * @param atomicityMode Cache atomicity mode.
+     * @param writeMode Write sync mode.
+     * @return Cache configuration.
+     */
+    protected CacheConfiguration<Object, Object> cacheConfiguration(
+        CacheMode cacheMode,
+        int backups,
+        CacheAtomicityMode atomicityMode,
+        CacheWriteSynchronizationMode writeMode) {
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setName("test-cache-" + atomicityMode + "-" + cacheMode + "-" + writeMode + "-" + backups);
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setCacheMode(cacheMode);
+        ccfg.setWriteSynchronizationMode(writeMode);
+        ccfg.setAtomicWriteOrderMode(PRIMARY);
+
+        if (cacheMode == PARTITIONED)
+            ccfg.setBackups(backups);
+
+        return ccfg;
+    }
+
+    /**
+     *
+     */
+    public static class QueryTestKey implements Serializable, Comparable {
+        /** */
+        private final Integer key;
+
+        /**
+         * @param key Key.
+         */
+        public QueryTestKey(Integer key) {
+            this.key = key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            QueryTestKey that = (QueryTestKey)o;
+
+            return key.equals(that.key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return key.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(QueryTestKey.class, this);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int compareTo(Object o) {
+            return key - ((QueryTestKey)o).key;
+        }
+    }
+
+    /**
+     *
+     */
+    public static class QueryTestValue implements Serializable {
+        /** */
+        @GridToStringInclude
+        protected final Integer val1;
+
+        /** */
+        @GridToStringInclude
+        protected final String val2;
+
+        /**
+         * @param val Value.
+         */
+        public QueryTestValue(Integer val) {
+            this.val1 = val;
+            this.val2 = String.valueOf(val);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            QueryTestValue that = (QueryTestValue) o;
+
+            return val1.equals(that.val1) && val2.equals(that.val2);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = val1.hashCode();
+
+            res = 31 * res + val2.hashCode();
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(QueryTestValue.class, this);
+        }
+    }
+}


Mime
View raw message