ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [03/43] ignite git commit: IGNITE-2004 Fixed "Asynchronous execution of ContinuousQuery's remote filter & local list".
Date Thu, 19 May 2016 09:37:36 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java
new file mode 100644
index 0000000..0605bc8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java
@@ -0,0 +1,986 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Serializable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
+import javax.cache.event.CacheEntryCreatedListener;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteAsyncCallback;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int NODES = 5;
+
+    /** */
+    public static final int ITERATION_CNT = 100;
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setClientMode(client);
+
+        MemoryEventStorageSpi storeSpi = new MemoryEventStorageSpi();
+        storeSpi.setExpireCount(1000);
+
+        cfg.setEventStorageSpi(storeSpi);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(NODES - 1);
+
+        client = true;
+
+        startGrid(NODES - 1);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    ///
+    /// ASYNC FILTER AND LISTENER. TEST LISTENER.
+    ///
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListenerTx() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListenerTxJCacheApi() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListenerTxOffHeap() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListenerTxOffHeapJCacheApi() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_TIERED), true, true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListenerTxOffHeapValues() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_VALUES), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListenerAtomic() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListenerAtomicJCacheApi() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED), true, true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListenerReplicatedAtomic() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListenerReplicatedAtomicJCacheApi() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED), true, true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListenerReplicatedAtomicOffHeapValues() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListenerAtomicOffHeap() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListenerAtomicOffHeapValues() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListenerAtomicWithoutBackup() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListenerAtomicWithoutBackupJCacheApi() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED), true, true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListener() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListenerReplicated() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInListenerReplicatedJCacheApi() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, true);
+    }
+
+    ///
+    /// ASYNC FILTER AND LISTENER. TEST FILTER.
+    ///
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterTx() throws Exception {
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterTxJCacheApi() throws Exception {
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterTxOffHeap() throws Exception {
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterTxOffHeapJCacheApi() throws Exception {
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_TIERED), true, true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterTxOffHeapValues() throws Exception {
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_VALUES), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterAtomic() throws Exception {
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterAtomicJCacheApi() throws Exception {
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED), true, true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterReplicatedAtomic() throws Exception {
+        testNonDeadLockInFilter(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterAtomicOffHeap() throws Exception {
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterAtomicOffHeapJCacheApi() throws Exception {
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), true, true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterAtomicOffHeapValues() throws Exception {
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterAtomicWithoutBackup() throws Exception {
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilter() throws Exception {
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterReplicated() throws Exception {
+        testNonDeadLockInFilter(cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterReplicatedJCacheApi() throws Exception {
+        testNonDeadLockInFilter(cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, false);
+    }
+
+    ///
+    /// ASYNC LISTENER. TEST LISTENER.
+    ///
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterTxSyncFilter() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), false, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterTxOffHeapSyncFilter() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_TIERED), false, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterTxOffHeapValuesSyncFilter() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_VALUES), false, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterAtomicSyncFilter() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED), false, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterReplicatedAtomicSyncFilter() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED), false, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterAtomicOffHeapSyncFilter() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), false, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterAtomicOffHeapValuesSyncFilter() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), false, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterAtomicWithoutBackupSyncFilter() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED), false, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterSyncFilter() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), false, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterReplicatedSyncFilter() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, ONHEAP_TIERED), false, true, false);
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @param asyncFltr Async filter.
+     * @param asyncLsnr Async listener.
+     * @param jcacheApi Use JCache api for registration entry update listener.
+     * @throws Exception If failed.
+     */
+    private void testNonDeadLockInListener(CacheConfiguration ccfg,
+        final boolean asyncFltr,
+        boolean asyncLsnr,
+        boolean jcacheApi) throws Exception {
+        ignite(0).createCache(ccfg);
+
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        try {
+            for (int i = 0; i < ITERATION_CNT; i++) {
+                log.info("Start iteration: " + i);
+
+                int nodeIdx = i % NODES;
+
+                final IgniteCache cache = grid(nodeIdx).cache(ccfg.getName());
+
+                final QueryTestKey key = NODES - 1 != nodeIdx ? affinityKey(cache) : new QueryTestKey(1);
+
+                final QueryTestValue val0 = new QueryTestValue(1);
+                final QueryTestValue newVal = new QueryTestValue(2);
+
+                final CountDownLatch latch = new CountDownLatch(1);
+                final CountDownLatch evtFromLsnrLatch = new CountDownLatch(1);
+
+                IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> fltrClsr =
+                    new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() {
+                        @Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey,
+                            ? extends QueryTestValue> e) {
+                            if (asyncFltr) {
+                                assertFalse("Failed: " + Thread.currentThread().getName(),
+                                    Thread.currentThread().getName().contains("sys-"));
+
+                                assertTrue("Failed: " + Thread.currentThread().getName(),
+                                    Thread.currentThread().getName().contains("callback-"));
+                            }
+                        }
+                    };
+
+                IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> lsnrClsr =
+                    new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() {
+                        @Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey,
+                            ? extends QueryTestValue> e) {
+                            IgniteCache<Object, Object> cache0 = ignite.cache(cache.getName());
+
+                            QueryTestValue val = e.getValue();
+
+                            if (val == null)
+                                return;
+                            else if (val.equals(newVal)) {
+                                evtFromLsnrLatch.countDown();
+
+                                return;
+                            }
+                            else if (!val.equals(val0))
+                                return;
+
+                            Transaction tx = null;
+
+                            try {
+                                if (cache0.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL)
+                                    tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ);
+
+                                assertEquals(val, val0);
+
+                                cache0.put(key, newVal);
+
+                                if (tx != null)
+                                    tx.commit();
+
+                                latch.countDown();
+                            }
+                            catch (Exception exp) {
+                                log.error("Failed: ", exp);
+
+                                throw new IgniteException(exp);
+                            }
+                            finally {
+                                if (tx != null)
+                                    tx.close();
+                            }
+                        }
+                    };
+
+                QueryCursor qry = null;
+                MutableCacheEntryListenerConfiguration<QueryTestKey, QueryTestValue> lsnrCfg = null;
+
+                CacheInvokeListener locLsnr = asyncLsnr ? new CacheInvokeListenerAsync(lsnrClsr)
+                    : new CacheInvokeListener(lsnrClsr);
+
+                CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue> rmtFltr = asyncFltr ?
+                    new CacheTestRemoteFilterAsync(fltrClsr) : new CacheTestRemoteFilter(fltrClsr);
+
+                if (jcacheApi) {
+                    lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+                        FactoryBuilder.factoryOf(locLsnr),
+                        FactoryBuilder.factoryOf(rmtFltr),
+                        true,
+                        false
+                    );
+
+                    cache.registerCacheEntryListener(lsnrCfg);
+                }
+                else {
+                    ContinuousQuery<QueryTestKey, QueryTestValue> conQry = new ContinuousQuery<>();
+
+                    conQry.setLocalListener(locLsnr);
+
+                    conQry.setRemoteFilterFactory(FactoryBuilder.factoryOf(rmtFltr));
+
+                    qry = cache.query(conQry);
+                }
+
+                try {
+                    if (rnd.nextBoolean())
+                        cache.put(key, val0);
+                    else {
+                        cache.invoke(key, new CacheEntryProcessor() {
+                            @Override public Object process(MutableEntry entry, Object... arguments)
+                                throws EntryProcessorException {
+                                entry.setValue(val0);
+
+                                return null;
+                            }
+                        });
+                    }
+
+                    assertTrue("Failed to waiting event.", U.await(latch, 3, SECONDS));
+
+                    assertEquals(cache.get(key), new QueryTestValue(2));
+
+                    assertTrue("Failed to waiting event from listener.", U.await(latch, 3, SECONDS));
+                }
+                finally {
+                    if (qry != null)
+                        qry.close();
+
+                    if (lsnrCfg != null)
+                        cache.deregisterCacheEntryListener(lsnrCfg);
+                }
+
+                log.info("Iteration finished: " + i);
+            }
+        }
+        finally {
+            ignite(0).destroyCache(ccfg.getName());
+        }
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @param asyncFilter Async filter.
+     * @param asyncLsnr Async listener.
+     * @param jcacheApi Use JCache api for start update listener.
+     * @throws Exception If failed.
+     */
+    private void testNonDeadLockInFilter(CacheConfiguration ccfg,
+        final boolean asyncFilter,
+        final boolean asyncLsnr,
+        boolean jcacheApi) throws Exception {
+        ignite(0).createCache(ccfg);
+
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        try {
+            for (int i = 0; i < ITERATION_CNT; i++) {
+                log.info("Start iteration: " + i);
+
+                int nodeIdx = i % NODES;
+
+                final IgniteCache cache = grid(nodeIdx).cache(ccfg.getName());
+
+                final QueryTestKey key = NODES - 1 != nodeIdx ? affinityKey(cache) : new QueryTestKey(1);
+
+                final QueryTestValue val0 = new QueryTestValue(1);
+                final QueryTestValue newVal = new QueryTestValue(2);
+
+                final CountDownLatch latch = new CountDownLatch(1);
+                final CountDownLatch evtFromLsnrLatch = new CountDownLatch(1);
+
+                IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> fltrClsr =
+                    new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() {
+                        @Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey,
+                            ? extends QueryTestValue> e) {
+                            if (asyncFilter) {
+                                assertFalse("Failed: " + Thread.currentThread().getName(),
+                                    Thread.currentThread().getName().contains("sys-"));
+
+                                assertTrue("Failed: " + Thread.currentThread().getName(),
+                                    Thread.currentThread().getName().contains("callback-"));
+                            }
+
+                            IgniteCache<Object, Object> cache0 = ignite.cache(cache.getName());
+
+                            QueryTestValue val = e.getValue();
+
+                            if (val == null)
+                                return;
+                            else if (val.equals(newVal)) {
+                                evtFromLsnrLatch.countDown();
+
+                                return;
+                            }
+                            else if (!val.equals(val0))
+                                return;
+
+                            Transaction tx = null;
+
+                            try {
+                                if (cache0.getConfiguration(CacheConfiguration.class)
+                                    .getAtomicityMode() == TRANSACTIONAL)
+                                    tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ);
+
+                                assertEquals(val, val0);
+
+                                cache0.put(key, newVal);
+
+                                if (tx != null)
+                                    tx.commit();
+
+                                latch.countDown();
+                            }
+                            catch (Exception exp) {
+                                log.error("Failed: ", exp);
+
+                                throw new IgniteException(exp);
+                            }
+                            finally {
+                                if (tx != null)
+                                    tx.close();
+                            }
+                        }
+                    };
+
+                IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> lsnrClsr =
+                    new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() {
+                        @Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey,
+                            ? extends QueryTestValue> e) {
+                            if (asyncLsnr) {
+                                assertFalse("Failed: " + Thread.currentThread().getName(),
+                                    Thread.currentThread().getName().contains("sys-"));
+
+                                assertTrue("Failed: " + Thread.currentThread().getName(),
+                                    Thread.currentThread().getName().contains("callback-"));
+                            }
+
+                            QueryTestValue val = e.getValue();
+
+                            if (val == null || !val.equals(new QueryTestValue(1)))
+                                return;
+
+                            assertEquals(val, val0);
+
+                            latch.countDown();
+                        }
+                    };
+
+
+                QueryCursor qry = null;
+                MutableCacheEntryListenerConfiguration<QueryTestKey, QueryTestValue> lsnrCfg = null;
+
+                CacheInvokeListener locLsnr = asyncLsnr ? new CacheInvokeListenerAsync(lsnrClsr)
+                    : new CacheInvokeListener(lsnrClsr);
+
+                CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue> rmtFltr = asyncFilter ?
+                    new CacheTestRemoteFilterAsync(fltrClsr) : new CacheTestRemoteFilter(fltrClsr);
+
+                if (jcacheApi) {
+                    lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+                        FactoryBuilder.factoryOf(locLsnr),
+                        FactoryBuilder.factoryOf(rmtFltr),
+                        true,
+                        false
+                    );
+
+                    cache.registerCacheEntryListener(lsnrCfg);
+                }
+                else {
+                    ContinuousQuery<QueryTestKey, QueryTestValue> conQry = new ContinuousQuery<>();
+
+                    conQry.setLocalListener(locLsnr);
+
+                    conQry.setRemoteFilterFactory(FactoryBuilder.factoryOf(rmtFltr));
+
+                    qry = cache.query(conQry);
+                }
+
+                try {
+                    if (rnd.nextBoolean())
+                        cache.put(key, val0);
+                    else
+                        cache.invoke(key, new CacheEntryProcessor() {
+                            @Override public Object process(MutableEntry entry, Object... arguments)
+                                throws EntryProcessorException {
+                                entry.setValue(val0);
+
+                                return null;
+                            }
+                        });
+
+                    assert U.await(latch, 3, SECONDS) : "Failed to waiting event.";
+
+                    assertEquals(cache.get(key), new QueryTestValue(2));
+
+                    assertTrue("Failed to waiting event from filter.", U.await(latch, 3, SECONDS));
+                }
+                finally {
+                    if (qry != null)
+                        qry.close();
+
+                    if (lsnrCfg != null)
+                        cache.deregisterCacheEntryListener(lsnrCfg);
+                }
+
+                log.info("Iteration finished: " + i);
+            }
+        }
+        finally {
+            ignite(0).destroyCache(ccfg.getName());
+        }
+    }
+
+    /**
+     * @param cache Ignite cache.
+     * @return Key.
+     */
+    private QueryTestKey affinityKey(IgniteCache cache) {
+        Affinity aff = affinity(cache);
+
+        for (int i = 0; i < 10_000; i++) {
+            QueryTestKey key = new QueryTestKey(i);
+
+            if (aff.isPrimary(localNode(cache), key))
+                return key;
+        }
+
+        throw new IgniteException("Failed to found primary key.");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return TimeUnit.SECONDS.toMillis(15);
+    }
+
+    /**
+     *
+     */
+    @IgniteAsyncCallback
+    private static class CacheTestRemoteFilterAsync extends CacheTestRemoteFilter {
+        /**
+         * @param clsr Closure.
+         */
+        public CacheTestRemoteFilterAsync(
+            IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> clsr) {
+            super(clsr);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class CacheTestRemoteFilter implements
+        CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue> {
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** */
+        private IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> clsr;
+
+        /**
+         * @param clsr Closure.
+         */
+        public CacheTestRemoteFilter(IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey,
+            ? extends QueryTestValue>> clsr) {
+            this.clsr = clsr;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e)
+            throws CacheEntryListenerException {
+            clsr.apply(ignite, e);
+
+            return true;
+        }
+    }
+
+    /**
+     *
+     */
+    @IgniteAsyncCallback
+    private static class CacheInvokeListenerAsync extends CacheInvokeListener {
+        /**
+         * @param clsr Closure.
+         */
+        public CacheInvokeListenerAsync(
+            IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> clsr) {
+            super(clsr);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class CacheInvokeListener implements CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>,
+        CacheEntryCreatedListener<QueryTestKey, QueryTestValue>, Serializable {
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** */
+        private IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> clsr;
+
+        /**
+         * @param clsr Closure.
+         */
+        public CacheInvokeListener(IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey,
+            ? extends QueryTestValue>> clsr) {
+            this.clsr = clsr;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey,
+            ? extends QueryTestValue>> events)
+            throws CacheEntryListenerException {
+            for (CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e : events)
+                clsr.apply(ignite, e);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onCreated(Iterable<CacheEntryEvent<? extends QueryTestKey,
+            ? extends QueryTestValue>> events) throws CacheEntryListenerException {
+            for (CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e : events)
+                clsr.apply(ignite, e);
+        }
+    }
+
+    /**
+     * @param cacheMode Cache mode.
+     * @param backups Number of backups.
+     * @param atomicityMode Cache atomicity mode.
+     * @param memoryMode Cache memory mode.
+     * @return Cache configuration.
+     */
+    protected CacheConfiguration<Object, Object> cacheConfiguration(
+        CacheMode cacheMode,
+        int backups,
+        CacheAtomicityMode atomicityMode,
+        CacheMemoryMode memoryMode) {
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setName("test-cache-" + atomicityMode + "-" + cacheMode + "-" + memoryMode + "-" + backups);
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setCacheMode(cacheMode);
+        ccfg.setMemoryMode(memoryMode);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicWriteOrderMode(PRIMARY);
+
+        if (cacheMode == PARTITIONED)
+            ccfg.setBackups(backups);
+
+        return ccfg;
+    }
+
+    /**
+     *
+     */
+    public static class QueryTestKey implements Serializable, Comparable {
+        /** */
+        private final Integer key;
+
+        /**
+         * @param key Key.
+         */
+        public QueryTestKey(Integer key) {
+            this.key = key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            QueryTestKey that = (QueryTestKey)o;
+
+            return key.equals(that.key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return key.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(QueryTestKey.class, this);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int compareTo(Object o) {
+            return key - ((QueryTestKey)o).key;
+        }
+    }
+
+    /**
+     *
+     */
+    public static class QueryTestValue implements Serializable {
+        /** */
+        @GridToStringInclude
+        protected final Integer val1;
+
+        /** */
+        @GridToStringInclude
+        protected final String val2;
+
+        /**
+         * @param val Value.
+         */
+        public QueryTestValue(Integer val) {
+            this.val1 = val;
+            this.val2 = String.valueOf(val);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            QueryTestValue that = (QueryTestValue)o;
+
+            return val1.equals(that.val1) && val2.equals(that.val2);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = val1.hashCode();
+
+            res = 31 * res + val2.hashCode();
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(QueryTestValue.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java
new file mode 100644
index 0000000..928cfda
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import javax.cache.configuration.Factory;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.lang.IgniteAsyncCallback;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ *
+ */
+public class CacheContinuousQueryFactoryAsyncFilterRandomOperationTest
+    extends CacheContinuousQueryFactoryFilterRandomOperationTest {
+    /** {@inheritDoc} */
+    @NotNull @Override protected Factory<? extends CacheEntryEventFilter<QueryTestKey, QueryTestValue>>
+        createFilterFactory() {
+        return new AsyncFilterFactory();
+    }
+
+    /**
+     *
+     */
+    @IgniteAsyncCallback
+    protected static class NonSerializableAsyncFilter implements
+        CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue>, Externalizable {
+        /** */
+        public NonSerializableAsyncFilter() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> evt) {
+            assertTrue("Failed. Current thread name: " + Thread.currentThread().getName(),
+                Thread.currentThread().getName().contains("callback-"));
+
+            assertFalse("Failed. Current thread name: " + Thread.currentThread().getName(),
+                Thread.currentThread().getName().contains("sys-"));
+
+            return isAccepted(evt.getValue());
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            fail("Entry filter should not be marshaled.");
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            fail("Entry filter should not be marshaled.");
+        }
+
+        /**
+         * @param val Value.
+         * @return {@code True} if value is even.
+         */
+        public static boolean isAccepted(QueryTestValue val) {
+            return val == null || val.val1 % 2 == 0;
+        }
+    }
+
+    /**
+     *
+     */
+    protected static class AsyncFilterFactory implements Factory<NonSerializableAsyncFilter> {
+        /** {@inheritDoc} */
+        @Override public NonSerializableAsyncFilter create() {
+            return new NonSerializableAsyncFilter();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Factory<? extends CacheEntryEventFilter<QueryTestKey, QueryTestValue>> noOpFilterFactory() {
+        return FactoryBuilder.factoryOf(NoopAsyncFilter.class);
+    }
+
+    /**
+     *
+     */
+    @IgniteAsyncCallback
+    protected static class NoopAsyncFilter implements
+        CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue>, Externalizable {
+        /** */
+        public NoopAsyncFilter() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> evt) {
+            assertTrue("Failed. Current thread name: " + Thread.currentThread().getName(),
+                Thread.currentThread().getName().contains("callback-"));
+
+            assertFalse("Failed. Current thread name: " + Thread.currentThread().getName(),
+                Thread.currentThread().getName().contains("sys-"));
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            // No-op.
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java
new file mode 100644
index 0000000..1178086
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java
@@ -0,0 +1,725 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+import javax.cache.configuration.Factory;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
+import javax.cache.event.CacheEntryCreatedListener;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryExpiredListener;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryRemovedListener;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.jetbrains.annotations.NotNull;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterRandomOperationTest.NonSerializableFilter.isAccepted;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.CLIENT;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.SERVER;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ *
+ */
+public class CacheContinuousQueryFactoryFilterRandomOperationTest extends CacheContinuousQueryRandomOperationsTest {
+    /** */
+    private static final int NODES = 5;
+
+    /** */
+    private static final int KEYS = 50;
+
+    /** */
+    private static final int VALS = 10;
+
+    /** */
+    public static final int ITERATION_CNT = 40;
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInternalQuery() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            1,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        final IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(ccfg);
+
+        UUID uuid = null;
+
+        try {
+            for (int i = 0; i < 10; i++)
+                cache.put(i, i);
+
+            final CountDownLatch latch = new CountDownLatch(5);
+
+            CacheEntryUpdatedListener lsnr = new CacheEntryUpdatedListener() {
+                @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
+                    for (Object evt : iterable) {
+                        latch.countDown();
+
+                        log.info("Received event: " + evt);
+                    }
+                }
+            };
+
+            uuid = grid(0).context().cache().cache(cache.getName()).context().continuousQueries()
+                .executeInternalQuery(lsnr, new SerializableFilter(), false, true, true);
+
+            for (int i = 10; i < 20; i++)
+                cache.put(i, i);
+
+            assertTrue(latch.await(3, SECONDS));
+        }
+        finally {
+            if (uuid != null)
+                grid(0).context().cache().cache(cache.getName()).context().continuousQueries()
+                    .cancelInternalQuery(uuid);
+
+            cache.destroy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void doTestContinuousQuery(CacheConfiguration<Object, Object> ccfg, ContinuousDeploy deploy)
+        throws Exception {
+        ignite(0).createCache(ccfg);
+
+        try {
+            long seed = System.currentTimeMillis();
+
+            Random rnd = new Random(seed);
+
+            log.info("Random seed: " + seed);
+
+            List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues = new ArrayList<>();
+
+            Collection<QueryCursor<?>> curs = new ArrayList<>();
+
+            Collection<T2<Integer, MutableCacheEntryListenerConfiguration>> lsnrCfgs = new ArrayList<>();
+
+            if (deploy == CLIENT)
+                evtsQueues.add(registerListener(ccfg.getName(), NODES - 1, curs, lsnrCfgs, rnd.nextBoolean()));
+            else if (deploy == SERVER)
+                evtsQueues.add(registerListener(ccfg.getName(), rnd.nextInt(NODES - 1), curs, lsnrCfgs,
+                    rnd.nextBoolean()));
+            else {
+                boolean isSync = rnd.nextBoolean();
+
+                for (int i = 0; i < NODES - 1; i++)
+                    evtsQueues.add(registerListener(ccfg.getName(), i, curs, lsnrCfgs, isSync));
+            }
+
+            ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>();
+
+            Map<Integer, Long> partCntr = new ConcurrentHashMap<>();
+
+            try {
+                for (int i = 0; i < ITERATION_CNT; i++) {
+                    if (i % 10 == 0)
+                        log.info("Iteration: " + i);
+
+                    for (int idx = 0; idx < NODES; idx++)
+                        randomUpdate(rnd, evtsQueues, expData, partCntr, grid(idx).cache(ccfg.getName()));
+                }
+            }
+            finally {
+                for (QueryCursor<?> cur : curs)
+                    cur.close();
+
+                for (T2<Integer, MutableCacheEntryListenerConfiguration> e : lsnrCfgs)
+                    grid(e.get1()).cache(ccfg.getName()).deregisterCacheEntryListener(e.get2());
+            }
+        }
+        finally {
+            ignite(0).destroyCache(ccfg.getName());
+        }
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param nodeIdx Node index.
+     * @param curs Cursors.
+     * @param lsnrCfgs Listener configurations.
+     * @return Event queue
+     */
+    private BlockingQueue<CacheEntryEvent<?, ?>> registerListener(String cacheName,
+        int nodeIdx,
+        Collection<QueryCursor<?>> curs,
+        Collection<T2<Integer, MutableCacheEntryListenerConfiguration>> lsnrCfgs,
+        boolean sync) {
+        final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000);
+
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            MutableCacheEntryListenerConfiguration<QueryTestKey, QueryTestValue> lsnrCfg =
+                new MutableCacheEntryListenerConfiguration<>(
+                    FactoryBuilder.factoryOf(new LocalNonSerialiseListener() {
+                        @Override protected void onEvents(Iterable<CacheEntryEvent<? extends QueryTestKey,
+                            ? extends QueryTestValue>> evts) {
+                            for (CacheEntryEvent<?, ?> evt : evts)
+                                evtsQueue.add(evt);
+                        }
+                    }),
+                    createFilterFactory(),
+                    true,
+                    sync
+                );
+
+            grid(nodeIdx).cache(cacheName).registerCacheEntryListener((CacheEntryListenerConfiguration)lsnrCfg);
+
+            lsnrCfgs.add(new T2<Integer, MutableCacheEntryListenerConfiguration>(nodeIdx, lsnrCfg));
+        }
+        else {
+            ContinuousQuery<QueryTestKey, QueryTestValue> qry = new ContinuousQuery<>();
+
+            qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() {
+                @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey,
+                    ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+                    for (CacheEntryEvent<?, ?> evt : evts)
+                        evtsQueue.add(evt);
+                }
+            });
+
+            qry.setRemoteFilterFactory(createFilterFactory());
+
+            QueryCursor<?> cur = grid(nodeIdx).cache(cacheName).query(qry);
+
+            curs.add(cur);
+        }
+
+        return evtsQueue;
+    }
+
+    /**
+     * @return Filter factory.
+     */
+    @NotNull protected Factory<? extends CacheEntryEventFilter<QueryTestKey, QueryTestValue>> createFilterFactory() {
+        return new FilterFactory();
+    }
+
+    /**
+     * @param rnd Random generator.
+     * @param evtsQueues Events queue.
+     * @param expData Expected cache data.
+     * @param partCntr Partition counter.
+     * @param cache Cache.
+     * @throws Exception If failed.
+     */
+    private void randomUpdate(
+        Random rnd,
+        List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues,
+        ConcurrentMap<Object, Object> expData,
+        Map<Integer, Long> partCntr,
+        IgniteCache<Object, Object> cache)
+        throws Exception {
+        Object key = new QueryTestKey(rnd.nextInt(KEYS));
+        Object newVal = value(rnd);
+        Object oldVal = expData.get(key);
+
+        int op = rnd.nextInt(11);
+
+        Ignite ignite = cache.unwrap(Ignite.class);
+
+        Transaction tx = null;
+
+        if (cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL && rnd.nextBoolean())
+            tx = ignite.transactions().txStart(txRandomConcurrency(rnd), txRandomIsolation(rnd));
+
+        try {
+            // log.info("Random operation [key=" + key + ", op=" + op + ']');
+
+            switch (op) {
+                case 0: {
+                    cache.put(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+                    expData.put(key, newVal);
+
+                    break;
+                }
+
+                case 1: {
+                    cache.getAndPut(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+                    expData.put(key, newVal);
+
+                    break;
+                }
+
+                case 2: {
+                    cache.remove(key);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal);
+
+                    expData.remove(key);
+
+                    break;
+                }
+
+                case 3: {
+                    cache.getAndRemove(key);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal);
+
+                    expData.remove(key);
+
+                    break;
+                }
+
+                case 4: {
+                    cache.invoke(key, new EntrySetValueProcessor(newVal, rnd.nextBoolean()));
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+                    expData.put(key, newVal);
+
+                    break;
+                }
+
+                case 5: {
+                    cache.invoke(key, new EntrySetValueProcessor(null, rnd.nextBoolean()));
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal);
+
+                    expData.remove(key);
+
+                    break;
+                }
+
+                case 6: {
+                    cache.putIfAbsent(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal == null) {
+                        updatePartitionCounter(cache, key, partCntr);
+
+                        waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, null);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(evtsQueues);
+
+                    break;
+                }
+
+                case 7: {
+                    cache.getAndPutIfAbsent(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal == null) {
+                        updatePartitionCounter(cache, key, partCntr);
+
+                        waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, null);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(evtsQueues);
+
+                    break;
+                }
+
+                case 8: {
+                    cache.replace(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal != null) {
+                        updatePartitionCounter(cache, key, partCntr);
+
+                        waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(evtsQueues);
+
+                    break;
+                }
+
+                case 9: {
+                    cache.getAndReplace(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal != null) {
+                        updatePartitionCounter(cache, key, partCntr);
+
+                        waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(evtsQueues);
+
+                    break;
+                }
+
+                case 10: {
+                    if (oldVal != null) {
+                        Object replaceVal = value(rnd);
+
+                        boolean success = replaceVal.equals(oldVal);
+
+                        if (success) {
+                            cache.replace(key, replaceVal, newVal);
+
+                            if (tx != null)
+                                tx.commit();
+
+                            updatePartitionCounter(cache, key, partCntr);
+
+                            waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+                            expData.put(key, newVal);
+                        }
+                        else {
+                            cache.replace(key, replaceVal, newVal);
+
+                            if (tx != null)
+                                tx.commit();
+
+                            checkNoEvent(evtsQueues);
+                        }
+                    }
+                    else {
+                        cache.replace(key, value(rnd), newVal);
+
+                        if (tx != null)
+                            tx.commit();
+
+                        checkNoEvent(evtsQueues);
+                    }
+
+                    break;
+                }
+
+                default:
+                    fail("Op:" + op);
+            }
+        }
+        finally {
+            if (tx != null)
+                tx.close();
+        }
+    }
+
+    /**
+     * @param rnd {@link Random}.
+     * @return {@link TransactionIsolation}.
+     */
+    private TransactionIsolation txRandomIsolation(Random rnd) {
+        int val = rnd.nextInt(3);
+
+        if (val == 0)
+            return READ_COMMITTED;
+        else if (val == 1)
+            return REPEATABLE_READ;
+        else
+            return SERIALIZABLE;
+    }
+
+    /**
+     * @param rnd {@link Random}.
+     * @return {@link TransactionConcurrency}.
+     */
+    private TransactionConcurrency txRandomConcurrency(Random rnd) {
+        return rnd.nextBoolean() ? TransactionConcurrency.OPTIMISTIC : TransactionConcurrency.PESSIMISTIC;
+    }
+
+    /**
+     * @param cache Cache.
+     * @param key Key
+     * @param cntrs Partition counters.
+     */
+    private void updatePartitionCounter(IgniteCache<Object, Object> cache, Object key, Map<Integer, Long> cntrs) {
+        Affinity<Object> aff = cache.unwrap(Ignite.class).affinity(cache.getName());
+
+        int part = aff.partition(key);
+
+        Long partCntr = cntrs.get(part);
+
+        if (partCntr == null)
+            partCntr = 0L;
+
+        cntrs.put(part, ++partCntr);
+    }
+
+    /**
+     * @param rnd Random generator.
+     * @return Cache value.
+     */
+    private static Object value(Random rnd) {
+        return new QueryTestValue(rnd.nextInt(VALS));
+    }
+
+    /**
+     * @param evtsQueues Event queue.
+     * @param partCntrs Partition counters.
+     * @param aff Affinity function.
+     * @param key Key.
+     * @param val Value.
+     * @param oldVal Old value.
+     * @throws Exception If failed.
+     */
+    private void waitAndCheckEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues,
+        Map<Integer, Long> partCntrs,
+        Affinity<Object> aff,
+        Object key,
+        Object val,
+        Object oldVal)
+        throws Exception {
+        if ((val == null && oldVal == null
+            || (val != null && !isAccepted((QueryTestValue)val)))) {
+            checkNoEvent(evtsQueues);
+
+            return;
+        }
+
+        for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) {
+            CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS);
+
+            assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal=" + oldVal + ']', evt);
+            assertEquals(key, evt.getKey());
+            assertEquals(val, evt.getValue());
+            assertEquals(oldVal, evt.getOldValue());
+
+            long cntr = partCntrs.get(aff.partition(key));
+            CacheQueryEntryEvent qryEntryEvt = evt.unwrap(CacheQueryEntryEvent.class);
+
+            assertNotNull(cntr);
+            assertNotNull(qryEntryEvt);
+
+            assertEquals(cntr, qryEntryEvt.getPartitionUpdateCounter());
+        }
+    }
+
+    /**
+     * @param evtsQueues Event queue.
+     * @throws Exception If failed.
+     */
+    private void checkNoEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues) throws Exception {
+        for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) {
+            CacheEntryEvent<?, ?> evt = evtsQueue.poll(50, MILLISECONDS);
+
+            assertNull(evt);
+        }
+    }
+
+    /**
+     *
+     */
+    protected static class NonSerializableFilter
+        implements CacheEntryEventSerializableFilter<CacheContinuousQueryRandomOperationsTest.QueryTestKey,
+            CacheContinuousQueryRandomOperationsTest.QueryTestValue>, Externalizable {
+        /** */
+        public NonSerializableFilter() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> evt) {
+            return isAccepted(evt.getValue());
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            fail("Entry filter should not be marshaled.");
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            fail("Entry filter should not be marshaled.");
+        }
+
+        /**
+         * @param val Value.
+         * @return {@code True} if value is even.
+         */
+        public static boolean isAccepted(QueryTestValue val) {
+            return val == null || val.val1 % 2 == 0;
+        }
+    }
+
+    /**
+     *
+     */
+    protected static class SerializableFilter implements CacheEntryEventSerializableFilter<Integer, Integer> {
+        /** */
+        public SerializableFilter() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> evt)
+            throws CacheEntryListenerException {
+            return isAccepted(evt.getValue());
+        }
+
+        /**
+         * @return {@code True} if value is even.
+         */
+        public static boolean isAccepted(Integer val) {
+            return val == null || val % 2 == 0;
+        }
+    }
+
+    /**
+     *
+     */
+    protected static class FilterFactory implements Factory<NonSerializableFilter> {
+        /** {@inheritDoc} */
+        @Override public NonSerializableFilter create() {
+            return new NonSerializableFilter();
+        }
+    }
+
+    /**
+     *
+     */
+    public abstract class LocalNonSerialiseListener implements
+        CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>,
+        CacheEntryCreatedListener<QueryTestKey, QueryTestValue>,
+        CacheEntryExpiredListener<QueryTestKey, QueryTestValue>,
+        CacheEntryRemovedListener<QueryTestKey, QueryTestValue>,
+        Externalizable {
+        /** */
+        public LocalNonSerialiseListener() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onCreated(Iterable<CacheEntryEvent<? extends QueryTestKey,
+            ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+            onEvents(evts);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onExpired(Iterable<CacheEntryEvent<? extends QueryTestKey,
+            ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+            onEvents(evts);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onRemoved(Iterable<CacheEntryEvent<? extends QueryTestKey,
+            ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+            onEvents(evts);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey,
+            ? extends QueryTestValue>> evts) throws CacheEntryListenerException {
+            onEvents(evts);
+        }
+
+        /**
+         * @param evts Events.
+         */
+        protected abstract void onEvents(Iterable<CacheEntryEvent<? extends QueryTestKey,
+            ? extends QueryTestValue>> evts);
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            throw new UnsupportedOperationException("Failed. Listener should not be marshaled.");
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            throw new UnsupportedOperationException("Failed. Listener should not be unmarshaled.");
+        }
+    }
+}


Mime
View raw message