ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [16/21] ignite git commit: IGNITE-2004 Fixed "Asynchronous execution of ContinuousQuery's remote filter & local list".
Date Mon, 25 Apr 2016 08:05:01 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java
new file mode 100644
index 0000000..7d975f2
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteAsyncCallback;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+
+/**
+ *
+ */
+public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTest {
+    /** */
+    public static final int LISTENER_CNT = 3;
+
+    /** */
+    public static final int KEYS = 10;
+
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int NODES = 5;
+
+    /** */
+    public static final int ITERATION_CNT = 100;
+
+    /** */
+    private boolean client;
+
+    /** */
+    private static volatile boolean fail;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
+        cfg.setClientMode(client);
+
+        MemoryEventStorageSpi storeSpi = new MemoryEventStorageSpi();
+        storeSpi.setExpireCount(100);
+
+        cfg.setEventStorageSpi(storeSpi);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(NODES - 1);
+
+        client = true;
+
+        startGrid(NODES - 1);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        fail = false;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOnheapTwoBackup() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2,
ATOMIC,
+            ONHEAP_TIERED, PRIMARY_SYNC);
+
+        doOrderingTest(ccfg, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOffheapTwoBackup() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2,
ATOMIC,
+            OFFHEAP_TIERED, PRIMARY_SYNC);
+
+        doOrderingTest(ccfg, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOffheapValuesTwoBackup() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2,
ATOMIC,
+            OFFHEAP_VALUES, PRIMARY_SYNC);
+
+        doOrderingTest(ccfg, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicReplicatedOffheap() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0,
ATOMIC,
+            OFFHEAP_TIERED, PRIMARY_SYNC);
+
+        doOrderingTest(ccfg, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOnheapTwoBackup() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2,
TRANSACTIONAL,
+            ONHEAP_TIERED, FULL_SYNC);
+
+        doOrderingTest(ccfg, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOnheapWithoutBackup() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 0,
TRANSACTIONAL,
+            ONHEAP_TIERED, PRIMARY_SYNC);
+
+        doOrderingTest(ccfg, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOnheapWithoutBackupFullSync() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 0,
TRANSACTIONAL,
+            ONHEAP_TIERED, FULL_SYNC);
+
+        doOrderingTest(ccfg, false);
+    }
+
+    // ASYNC
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOnheapTwoBackupAsync() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2,
ATOMIC,
+            ONHEAP_TIERED, PRIMARY_SYNC);
+
+        doOrderingTest(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOnheapTwoBackupAsyncFullSync() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2,
ATOMIC,
+            ONHEAP_TIERED, FULL_SYNC);
+
+        doOrderingTest(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOffheapTwoBackupAsync() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2,
ATOMIC,
+            OFFHEAP_TIERED, PRIMARY_SYNC);
+
+        doOrderingTest(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOffheapTwoBackupAsyncFullSync() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2,
ATOMIC,
+            OFFHEAP_TIERED, FULL_SYNC);
+
+        doOrderingTest(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOffheapValuesTwoBackupAsync() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2,
ATOMIC,
+            OFFHEAP_VALUES, PRIMARY_SYNC);
+
+        doOrderingTest(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOffheapValuesTwoBackupAsyncFullSync() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2,
ATOMIC,
+            OFFHEAP_VALUES, FULL_SYNC);
+
+        doOrderingTest(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicReplicatedAsync() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0,
ATOMIC,
+            ONHEAP_TIERED, PRIMARY_SYNC);
+
+        doOrderingTest(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicReplicatedAsyncFullSync() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0,
ATOMIC,
+            ONHEAP_TIERED, FULL_SYNC);
+
+        doOrderingTest(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicReplicatedOffheapAsync() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0,
ATOMIC,
+            OFFHEAP_TIERED, PRIMARY_SYNC);
+
+        doOrderingTest(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOnheapWithoutBackupAsync() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 0,
ATOMIC,
+            ONHEAP_TIERED, PRIMARY_SYNC);
+
+        doOrderingTest(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOnheapTwoBackupAsync() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2,
TRANSACTIONAL,
+            ONHEAP_TIERED, PRIMARY_SYNC);
+
+        doOrderingTest(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOnheapAsync() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 0,
TRANSACTIONAL,
+            ONHEAP_TIERED, PRIMARY_SYNC);
+
+        doOrderingTest(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOnheapAsyncFullSync() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 0,
TRANSACTIONAL,
+            ONHEAP_TIERED, FULL_SYNC);
+
+        doOrderingTest(ccfg, true);
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @param async Async filter.
+     * @throws Exception If failed.
+     */
+    protected void doOrderingTest(
+        final CacheConfiguration ccfg,
+        final boolean async)
+        throws Exception {
+        ignite(0).createCache(ccfg);
+
+        List<QueryCursor<?>> qries = new ArrayList<>();
+
+        try {
+            List<BlockingQueue<CacheEntryEvent<QueryTestKey, QueryTestValue>>>
rcvdEvts =
+                new ArrayList<>(LISTENER_CNT * NODES);
+
+            final AtomicInteger qryCntr = new AtomicInteger(0);
+
+            final int threadCnt = 20;
+
+            for (int idx = 0; idx < NODES; idx++) {
+                for (int i = 0; i < LISTENER_CNT; i++) {
+                    BlockingQueue<CacheEntryEvent<QueryTestKey, QueryTestValue>>
queue =
+                        new ArrayBlockingQueue<>(ITERATION_CNT * threadCnt);
+
+                    ContinuousQuery qry = new ContinuousQuery();
+
+                    if (async) {
+                        qry.setLocalListener(new TestCacheAsyncEventListener(queue, qryCntr));
+
+                        qry.setRemoteFilterFactory(FactoryBuilder.factoryOf(
+                            new CacheTestRemoteFilterAsync(ccfg.getName())));
+                    }
+                    else {
+                        qry.setLocalListener(new TestCacheEventListener(queue, qryCntr));
+
+                        qry.setRemoteFilterFactory(FactoryBuilder.factoryOf(
+                            new CacheTestRemoteFilter(ccfg.getName())));
+                    }
+
+                    rcvdEvts.add(queue);
+
+                    IgniteCache<Object, Object> cache = grid(idx).cache(ccfg.getName());
+
+                    QueryCursor qryCursor = cache.query(qry);
+
+                    qries.add(qryCursor);
+                }
+            }
+
+            IgniteInternalFuture<Long> f = GridTestUtils.runMultiThreadedAsync(new
Runnable() {
+                @Override public void run() {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    for (int i = 0; i < ITERATION_CNT; i++) {
+                        IgniteCache<QueryTestKey, QueryTestValue> cache =
+                            grid(rnd.nextInt(NODES)).cache(ccfg.getName());
+
+                        QueryTestKey key = new QueryTestKey(rnd.nextInt(KEYS));
+
+                        boolean startTx = cache.getConfiguration(CacheConfiguration.class).getAtomicityMode()
==
+                            TRANSACTIONAL && rnd.nextBoolean();
+
+                        Transaction tx = null;
+
+                        if (startTx)
+                            tx = cache.unwrap(Ignite.class).transactions().txStart();
+
+                        try {
+                            if ((cache.get(key) == null) || rnd.nextBoolean()) {
+                                cache.invoke(key, new CacheEntryProcessor<QueryTestKey,
QueryTestValue, Object>() {
+                                    @Override public Object process(
+                                        MutableEntry<QueryTestKey, QueryTestValue>
entry,
+                                        Object... arguments)
+                                        throws EntryProcessorException {
+                                        if (entry.exists())
+                                            entry.setValue(new QueryTestValue(entry.getValue().val1
+ 1));
+                                        else
+                                            entry.setValue(new QueryTestValue(0));
+
+                                        return null;
+                                    }
+                                });
+                            }
+                            else {
+                                QueryTestValue val;
+                                QueryTestValue newVal;
+
+                                do {
+                                    val = cache.get(key);
+
+                                    newVal = val == null ?
+                                        new QueryTestValue(0) : new QueryTestValue(val.val1
+ 1);
+                                }
+                                while (!cache.replace(key, val, newVal));
+                            }
+                        }
+                        finally {
+                            if (tx != null)
+                                tx.commit();
+                        }
+                    }
+                }
+            }, threadCnt, "put-thread");
+
+            f.get(15, TimeUnit.SECONDS);
+
+            GridTestUtils.waitForCondition(new PA() {
+                @Override public boolean apply() {
+                    return qryCntr.get() >= ITERATION_CNT * threadCnt * LISTENER_CNT *
NODES;
+                }
+            }, 1000L);
+
+            for (BlockingQueue<CacheEntryEvent<QueryTestKey, QueryTestValue>>
queue : rcvdEvts)
+                checkEvents(queue, ITERATION_CNT * threadCnt);
+
+            assertFalse("Ordering invocations of filter broken.", fail);
+        }
+        finally {
+            for (QueryCursor<?> qry : qries)
+                qry.close();
+
+            ignite(0).destroyCache(ccfg.getName());
+        }
+    }
+
+    /**
+     * @param queue Event queue.
+     * @throws Exception If failed.
+     */
+    private void checkEvents(BlockingQueue<CacheEntryEvent<QueryTestKey, QueryTestValue>>
queue, int expCnt)
+        throws Exception {
+        CacheEntryEvent<QueryTestKey, QueryTestValue> evt;
+        int cnt = 0;
+        Map<QueryTestKey, Integer> vals = new HashMap<>();
+
+        while ((evt = queue.poll(100, TimeUnit.MILLISECONDS)) != null) {
+            assertNotNull(evt);
+            assertNotNull(evt.getKey());
+
+            Integer preVal = vals.get(evt.getKey());
+
+            if (preVal == null)
+                assertEquals(new QueryTestValue(0), evt.getValue());
+            else {
+                if (!new QueryTestValue(preVal + 1).equals(evt.getValue()))
+                    assertEquals("Key event: " + evt.getKey(), new QueryTestValue(preVal
+ 1), evt.getValue());
+            }
+
+            vals.put(evt.getKey(), evt.getValue().val1);
+
+            ++cnt;
+        }
+
+        assertEquals(expCnt, cnt);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return TimeUnit.MINUTES.toMillis(8);
+    }
+
+    /**
+     *
+     */
+    @IgniteAsyncCallback
+    private static class CacheTestRemoteFilterAsync extends CacheTestRemoteFilter {
+        /**
+         * @param cacheName Cache name.
+         */
+        public CacheTestRemoteFilterAsync(String cacheName) {
+            super(cacheName);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class CacheTestRemoteFilter implements
+        CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue> {
+        /** */
+        private Map<QueryTestKey, QueryTestValue> prevVals = new ConcurrentHashMap<>();
+
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** */
+        private String cacheName;
+
+        /**
+         * @param cacheName Cache name.
+         */
+        public CacheTestRemoteFilter(String cacheName) {
+            this.cacheName = cacheName;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends
QueryTestValue> e) {
+            if (affinity(ignite.cache(cacheName)).isPrimary(ignite.cluster().localNode(),
e.getKey())) {
+                QueryTestValue prevVal = prevVals.put(e.getKey(), e.getValue());
+
+                if (prevVal != null) {
+                    if (!new QueryTestValue(prevVal.val1 + 1).equals(e.getValue()))
+                        fail = true;
+                }
+            }
+
+            return true;
+        }
+    }
+
+    /**
+     *
+     */
+    @IgniteAsyncCallback
+    private static class TestCacheAsyncEventListener extends TestCacheEventListener {
+        /**
+         * @param queue Queue.
+         * @param cntr Received events counter.
+         */
+        public TestCacheAsyncEventListener(BlockingQueue<CacheEntryEvent<QueryTestKey,
QueryTestValue>> queue,
+            AtomicInteger cntr) {
+            super(queue, cntr);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestCacheEventListener implements CacheEntryUpdatedListener<QueryTestKey,
QueryTestValue> {
+        /** */
+        private final BlockingQueue<CacheEntryEvent<QueryTestKey, QueryTestValue>>
queue;
+
+        /** */
+        private final AtomicInteger cntr;
+
+        /**
+         * @param queue Queue.
+         * @param cntr Received events counter.
+         */
+        public TestCacheEventListener(BlockingQueue<CacheEntryEvent<QueryTestKey, QueryTestValue>>
queue,
+            AtomicInteger cntr) {
+            this.queue = queue;
+            this.cntr = cntr;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey,
+            ? extends QueryTestValue>> evts) {
+            for (CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>
e : evts) {
+                queue.add((CacheEntryEvent<QueryTestKey, QueryTestValue>)e);
+
+                cntr.incrementAndGet();
+            }
+        }
+    }
+
+    /**
+     * @param cacheMode Cache mode.
+     * @param backups Number of backups.
+     * @param atomicityMode Cache atomicity mode.
+     * @param memoryMode Cache memory mode.
+     * @param writeMode Cache write mode.
+     * @return Cache configuration.
+     */
+    protected CacheConfiguration<Object, Object> cacheConfiguration(
+        CacheMode cacheMode,
+        int backups,
+        CacheAtomicityMode atomicityMode,
+        CacheMemoryMode memoryMode,
+        CacheWriteSynchronizationMode writeMode) {
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setName("test-cache-" + atomicityMode + "-" + cacheMode + "-" + memoryMode +
"-" + memoryMode + "-"
+            + backups);
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setCacheMode(cacheMode);
+        ccfg.setMemoryMode(memoryMode);
+        ccfg.setWriteSynchronizationMode(writeMode);
+        ccfg.setAtomicWriteOrderMode(PRIMARY);
+
+        if (cacheMode == PARTITIONED)
+            ccfg.setBackups(backups);
+
+        return ccfg;
+    }
+
+    /**
+     *
+     */
+    public static class QueryTestKey implements Serializable, Comparable {
+        /** */
+        private final Integer key;
+
+        /**
+         * @param key Key.
+         */
+        public QueryTestKey(Integer key) {
+            this.key = key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            QueryTestKey that = (QueryTestKey)o;
+
+            return key.equals(that.key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return key.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(QueryTestKey.class, this);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int compareTo(Object o) {
+            return key - ((QueryTestKey)o).key;
+        }
+    }
+
+    /**
+     *
+     */
+    public static class QueryTestValue implements Serializable {
+        /** */
+        @GridToStringInclude
+        protected final Integer val1;
+
+        /** */
+        @GridToStringInclude
+        protected final String val2;
+
+        /**
+         * @param val Value.
+         */
+        public QueryTestValue(Integer val) {
+            this.val1 = val;
+            this.val2 = String.valueOf(val);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            QueryTestValue that = (QueryTestValue) o;
+
+            return val1.equals(that.val1) && val2.equals(that.val2);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = val1.hashCode();
+
+            res = 31 * res + val2.hashCode();
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(QueryTestValue.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
index e9fbf70..a6c33bb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
@@ -62,6 +62,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -118,6 +119,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
         ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
 
         cfg.setClientMode(client);
 
@@ -598,6 +600,9 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             final List<CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>
evts =
                 new CopyOnWriteArrayList<>();
 
+            if (noOpFilterFactory() != null)
+                qry.setRemoteFilterFactory(noOpFilterFactory());
+
             qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>()
{
                 @Override public void onUpdated(Iterable<CacheEntryEvent<? extends
QueryTestKey,
                     ? extends QueryTestValue>> events) throws CacheEntryListenerException
{
@@ -684,9 +689,17 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
                     checkSingleEvent(evts.get(7), CREATED, new QueryTestValue(5), null);
                     checkSingleEvent(evts.get(8), EventType.UPDATED, new QueryTestValue(6),
new QueryTestValue(5));
 
+                    evts.clear();
+
                     cache.remove(key);
                     cache.remove(key);
 
+                    assert GridTestUtils.waitForCondition(new PA() {
+                        @Override public boolean apply() {
+                            return evts.size() == 1;
+                        }
+                    }, 5_000);
+
                     evts.clear();
 
                     log.info("Finish iteration: " + i);
@@ -699,6 +712,13 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
     }
 
     /**
+     * @return No-op filter factory for batch operations.
+     */
+    protected Factory<? extends CacheEntryEventFilter<QueryTestKey, QueryTestValue>>
noOpFilterFactory() {
+        return null;
+    }
+
+    /**
      * @param ccfg Cache configuration.
      * @throws Exception If failed.
      */
@@ -711,6 +731,9 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             final List<CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>
evts =
                 new CopyOnWriteArrayList<>();
 
+            if (noOpFilterFactory() != null)
+                qry.setRemoteFilterFactory(noOpFilterFactory());
+
             qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>()
{
                 @Override public void onUpdated(Iterable<CacheEntryEvent<? extends
QueryTestKey,
                     ? extends QueryTestValue>> events) throws CacheEntryListenerException
{

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
index b4c31be..e4afe73 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
@@ -60,6 +60,7 @@ public class GridTestKernalContext extends GridKernalContextImpl {
                 null,
                 null,
                 null,
+                null,
                 U.allPluginProviders());
 
         GridTestUtils.setFieldValue(grid(), "cfg", config());

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
index 30625fe..761d4bd 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
@@ -44,7 +44,6 @@ public class IgniteBinaryCacheQueryTestSuite extends TestSuite {
         suite.addTestSuite(BinarySerializationQuerySelfTest.class);
         suite.addTestSuite(BinarySerializationQueryWithReflectiveSerializerSelfTest.class);
         suite.addTestSuite(IgniteCacheBinaryObjectsScanSelfTest.class);
-        suite.addTestSuite(CacheContinuousQueryLostPartitionTest.class);
 
         //Should be adjusted. Not ready to be used with BinaryMarshaller.
         //suite.addTestSuite(GridCacheBinarySwapScanQuerySelfTest.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
index fbb3091..e0e81b7 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
@@ -20,9 +20,14 @@ package org.apache.ignite.testsuites;
 import junit.framework.TestSuite;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchAckTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchForceServerModeAckTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryAsyncFilterListenerTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryExecuteInPrimaryTest;
-import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryAsyncFilterRandomOperationTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterRandomOperationTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryLostPartitionTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryOperationFromCallbackTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryOperationP2PTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryOrderingEventTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTwoNodesTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicNearEnabledSelfTest;
@@ -87,12 +92,17 @@ public class IgniteCacheQuerySelfTestSuite3 extends TestSuite {
         suite.addTestSuite(IgniteCacheContinuousQueryClientTxReconnectTest.class);
         suite.addTestSuite(CacheContinuousQueryRandomOperationsTest.class);
         suite.addTestSuite(CacheContinuousQueryRandomOperationsTwoNodesTest.class);
-        suite.addTestSuite(CacheContinuousQueryFactoryFilterTest.class);
         suite.addTestSuite(GridCacheContinuousQueryConcurrentTest.class);
+        suite.addTestSuite(CacheContinuousQueryAsyncFilterListenerTest.class);
+        suite.addTestSuite(CacheContinuousQueryFactoryFilterRandomOperationTest.class);
+        suite.addTestSuite(CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.class);
+        suite.addTestSuite(CacheContinuousQueryOrderingEventTest.class);
+        suite.addTestSuite(CacheContinuousQueryOperationFromCallbackTest.class);
         suite.addTestSuite(CacheContinuousQueryOperationP2PTest.class);
         suite.addTestSuite(CacheContinuousBatchAckTest.class);
         suite.addTestSuite(CacheContinuousBatchForceServerModeAckTest.class);
         suite.addTestSuite(CacheContinuousQueryExecuteInPrimaryTest.class);
+        suite.addTestSuite(CacheContinuousQueryLostPartitionTest.class);
 
         return suite;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java
index fa4e642..c4fcdac 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java
@@ -18,6 +18,9 @@
 package org.apache.ignite.testsuites;
 
 import junit.framework.TestSuite;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryAsyncFailoverTxSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicReplicatedSelfTest;
@@ -44,6 +47,10 @@ public class IgniteCacheQuerySelfTestSuite4 extends TestSuite {
         suite.addTestSuite(CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.class);
         suite.addTestSuite(CacheContinuousQueryFailoverTxOffheapTieredTest.class);
 
+        suite.addTestSuite(CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest.class);
+        suite.addTestSuite(CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest.class);
+        suite.addTestSuite(CacheContinuousQueryAsyncFailoverTxSelfTest.class);
+
         return suite;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/CacheEntryEventAsyncProbe.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/CacheEntryEventAsyncProbe.java
b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/CacheEntryEventAsyncProbe.java
new file mode 100644
index 0000000..0ea66d4
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/CacheEntryEventAsyncProbe.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache;
+
+import java.util.concurrent.atomic.AtomicLong;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.lang.IgniteAsyncCallback;
+
+/**
+ * Probe which calculate continuous query events.
+ */
+public class CacheEntryEventAsyncProbe extends CacheEntryEventProbe {
+    /** */
+    @Override protected CacheEntryUpdatedListener<Integer, Integer> localListener(AtomicLong
cntr) {
+        return new CacheEntryEventListener(cntr);
+    }
+
+    /**
+     *
+     */
+    @IgniteAsyncCallback
+    private static final class CacheEntryEventListener implements CacheEntryUpdatedListener<Integer,
Integer> {
+        /** */
+        private AtomicLong cnt;
+
+        /**
+         * @param cnt Counter.
+         */
+        public CacheEntryEventListener(AtomicLong cnt) {
+            this.cnt = cnt;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer,
? extends Integer>> events)
+        throws CacheEntryListenerException {
+            int size = 0;
+
+            for (CacheEntryEvent<? extends Integer, ? extends Integer> e : events)
+                ++size;
+
+            cnt.addAndGet(size);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/CacheEntryEventProbe.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/CacheEntryEventProbe.java
b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/CacheEntryEventProbe.java
index e42479a..a25f975 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/CacheEntryEventProbe.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/CacheEntryEventProbe.java
@@ -45,9 +45,6 @@ public class CacheEntryEventProbe implements BenchmarkProbe {
     /** */
     private BenchmarkConfiguration cfg;
 
-    /** Counter. */
-    private AtomicLong cnt = new AtomicLong(0);
-
     /** Collected points. */
     private Collection<BenchmarkProbePoint> collected = new ArrayList<>();
 
@@ -67,17 +64,9 @@ public class CacheEntryEventProbe implements BenchmarkProbe {
             if (drv0.cache() != null) {
                 ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
 
-                qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>()
{
-                    @Override public void onUpdated(Iterable<CacheEntryEvent<? extends
Integer, ? extends Integer>>
-                        events) throws CacheEntryListenerException {
-                        int size = 0;
-
-                        for (CacheEntryEvent<? extends Integer, ? extends Integer>
e : events)
-                            ++size;
+                final AtomicLong cnt = new AtomicLong();
 
-                        cnt.addAndGet(size);
-                    }
-                });
+                qry.setLocalListener(localListener(cnt));
 
                 qryCur = drv0.cache().query(qry);
 
@@ -113,6 +102,24 @@ public class CacheEntryEventProbe implements BenchmarkProbe {
                 + " probe. Probably, the driver doesn't provide \"cache()\" method.");
     }
 
+    /**
+     * @param cntr Received event counter.
+     * @return Local listener.
+     */
+    protected CacheEntryUpdatedListener<Integer, Integer> localListener(final AtomicLong
cntr) {
+        return new CacheEntryUpdatedListener<Integer, Integer>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer,
? extends Integer>> events)
+                throws CacheEntryListenerException {
+                int size = 0;
+
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> e : events)
+                    ++size;
+
+                cntr.addAndGet(size);
+            }
+        };
+    }
+
     /** {@inheritDoc} */
     @Override public void stop() throws Exception {
         if (qryCur != null) {


Mime
View raw message