Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9BACB19191 for ; Sat, 23 Apr 2016 08:47:03 +0000 (UTC) Received: (qmail 79888 invoked by uid 500); 23 Apr 2016 08:47:03 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 79816 invoked by uid 500); 23 Apr 2016 08:47:03 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 79694 invoked by uid 99); 23 Apr 2016 08:47:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 23 Apr 2016 08:47:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id F12F4DF97F; Sat, 23 Apr 2016 08:47:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Sat, 23 Apr 2016 08:47:04 -0000 Message-Id: <35b9f489f15741009a880418c7205c70@git.apache.org> In-Reply-To: <2c94c76db9bf4b1281c9e6f6237c7c7d@git.apache.org> References: <2c94c76db9bf4b1281c9e6f6237c7c7d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/7] ignite git commit: IGNITE-2004 Fixed "Asynchronous execution of ContinuousQuery's remote filter & local list". http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java new file mode 100644 index 0000000..7d975f2 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java @@ -0,0 +1,722 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.configuration.FactoryBuilder; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryUpdatedListener; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheEntryEventSerializableFilter; +import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.cache.CacheMemoryMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.PA; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteAsyncCallback; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED; +import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES; +import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; + +/** + * + */ +public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTest { + /** */ + public static final int LISTENER_CNT = 3; + + /** */ + public static final int KEYS = 10; + + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int NODES = 5; + + /** */ + public static final int ITERATION_CNT = 100; + + /** */ + private boolean client; + + /** */ + private static volatile boolean fail; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + + cfg.setClientMode(client); + + MemoryEventStorageSpi storeSpi = new MemoryEventStorageSpi(); + storeSpi.setExpireCount(100); + + cfg.setEventStorageSpi(storeSpi); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(NODES - 1); + + client = true; + + startGrid(NODES - 1); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + fail = false; + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOnheapTwoBackup() throws Exception { + CacheConfiguration ccfg = cacheConfiguration(PARTITIONED, 2, ATOMIC, + ONHEAP_TIERED, PRIMARY_SYNC); + + doOrderingTest(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffheapTwoBackup() throws Exception { + CacheConfiguration ccfg = cacheConfiguration(PARTITIONED, 2, ATOMIC, + OFFHEAP_TIERED, PRIMARY_SYNC); + + doOrderingTest(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffheapValuesTwoBackup() throws Exception { + CacheConfiguration ccfg = cacheConfiguration(PARTITIONED, 2, ATOMIC, + OFFHEAP_VALUES, PRIMARY_SYNC); + + doOrderingTest(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicReplicatedOffheap() throws Exception { + CacheConfiguration ccfg = cacheConfiguration(REPLICATED, 0, ATOMIC, + OFFHEAP_TIERED, PRIMARY_SYNC); + + doOrderingTest(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxOnheapTwoBackup() throws Exception { + CacheConfiguration ccfg = cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, + ONHEAP_TIERED, FULL_SYNC); + + doOrderingTest(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxOnheapWithoutBackup() throws Exception { + CacheConfiguration ccfg = cacheConfiguration(PARTITIONED, 0, TRANSACTIONAL, + ONHEAP_TIERED, PRIMARY_SYNC); + + doOrderingTest(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxOnheapWithoutBackupFullSync() throws Exception { + CacheConfiguration ccfg = cacheConfiguration(PARTITIONED, 0, TRANSACTIONAL, + ONHEAP_TIERED, FULL_SYNC); + + doOrderingTest(ccfg, false); + } + + // ASYNC + + /** + * @throws Exception If failed. + */ + public void testAtomicOnheapTwoBackupAsync() throws Exception { + CacheConfiguration ccfg = cacheConfiguration(PARTITIONED, 2, ATOMIC, + ONHEAP_TIERED, PRIMARY_SYNC); + + doOrderingTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOnheapTwoBackupAsyncFullSync() throws Exception { + CacheConfiguration ccfg = cacheConfiguration(PARTITIONED, 2, ATOMIC, + ONHEAP_TIERED, FULL_SYNC); + + doOrderingTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffheapTwoBackupAsync() throws Exception { + CacheConfiguration ccfg = cacheConfiguration(PARTITIONED, 2, ATOMIC, + OFFHEAP_TIERED, PRIMARY_SYNC); + + doOrderingTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffheapTwoBackupAsyncFullSync() throws Exception { + CacheConfiguration ccfg = cacheConfiguration(PARTITIONED, 2, ATOMIC, + OFFHEAP_TIERED, FULL_SYNC); + + doOrderingTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffheapValuesTwoBackupAsync() throws Exception { + CacheConfiguration ccfg = cacheConfiguration(PARTITIONED, 2, ATOMIC, + OFFHEAP_VALUES, PRIMARY_SYNC); + + doOrderingTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffheapValuesTwoBackupAsyncFullSync() throws Exception { + CacheConfiguration ccfg = cacheConfiguration(PARTITIONED, 2, ATOMIC, + OFFHEAP_VALUES, FULL_SYNC); + + doOrderingTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicReplicatedAsync() throws Exception { + CacheConfiguration ccfg = cacheConfiguration(REPLICATED, 0, ATOMIC, + ONHEAP_TIERED, PRIMARY_SYNC); + + doOrderingTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicReplicatedAsyncFullSync() throws Exception { + CacheConfiguration ccfg = cacheConfiguration(REPLICATED, 0, ATOMIC, + ONHEAP_TIERED, FULL_SYNC); + + doOrderingTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicReplicatedOffheapAsync() throws Exception { + CacheConfiguration ccfg = cacheConfiguration(REPLICATED, 0, ATOMIC, + OFFHEAP_TIERED, PRIMARY_SYNC); + + doOrderingTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOnheapWithoutBackupAsync() throws Exception { + CacheConfiguration ccfg = cacheConfiguration(PARTITIONED, 0, ATOMIC, + ONHEAP_TIERED, PRIMARY_SYNC); + + doOrderingTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testTxOnheapTwoBackupAsync() throws Exception { + CacheConfiguration ccfg = cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, + ONHEAP_TIERED, PRIMARY_SYNC); + + doOrderingTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testTxOnheapAsync() throws Exception { + CacheConfiguration ccfg = cacheConfiguration(PARTITIONED, 0, TRANSACTIONAL, + ONHEAP_TIERED, PRIMARY_SYNC); + + doOrderingTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testTxOnheapAsyncFullSync() throws Exception { + CacheConfiguration ccfg = cacheConfiguration(PARTITIONED, 0, TRANSACTIONAL, + ONHEAP_TIERED, FULL_SYNC); + + doOrderingTest(ccfg, true); + } + + /** + * @param ccfg Cache configuration. + * @param async Async filter. + * @throws Exception If failed. + */ + protected void doOrderingTest( + final CacheConfiguration ccfg, + final boolean async) + throws Exception { + ignite(0).createCache(ccfg); + + List> qries = new ArrayList<>(); + + try { + List>> rcvdEvts = + new ArrayList<>(LISTENER_CNT * NODES); + + final AtomicInteger qryCntr = new AtomicInteger(0); + + final int threadCnt = 20; + + for (int idx = 0; idx < NODES; idx++) { + for (int i = 0; i < LISTENER_CNT; i++) { + BlockingQueue> queue = + new ArrayBlockingQueue<>(ITERATION_CNT * threadCnt); + + ContinuousQuery qry = new ContinuousQuery(); + + if (async) { + qry.setLocalListener(new TestCacheAsyncEventListener(queue, qryCntr)); + + qry.setRemoteFilterFactory(FactoryBuilder.factoryOf( + new CacheTestRemoteFilterAsync(ccfg.getName()))); + } + else { + qry.setLocalListener(new TestCacheEventListener(queue, qryCntr)); + + qry.setRemoteFilterFactory(FactoryBuilder.factoryOf( + new CacheTestRemoteFilter(ccfg.getName()))); + } + + rcvdEvts.add(queue); + + IgniteCache cache = grid(idx).cache(ccfg.getName()); + + QueryCursor qryCursor = cache.query(qry); + + qries.add(qryCursor); + } + } + + IgniteInternalFuture f = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (int i = 0; i < ITERATION_CNT; i++) { + IgniteCache cache = + grid(rnd.nextInt(NODES)).cache(ccfg.getName()); + + QueryTestKey key = new QueryTestKey(rnd.nextInt(KEYS)); + + boolean startTx = cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == + TRANSACTIONAL && rnd.nextBoolean(); + + Transaction tx = null; + + if (startTx) + tx = cache.unwrap(Ignite.class).transactions().txStart(); + + try { + if ((cache.get(key) == null) || rnd.nextBoolean()) { + cache.invoke(key, new CacheEntryProcessor() { + @Override public Object process( + MutableEntry entry, + Object... arguments) + throws EntryProcessorException { + if (entry.exists()) + entry.setValue(new QueryTestValue(entry.getValue().val1 + 1)); + else + entry.setValue(new QueryTestValue(0)); + + return null; + } + }); + } + else { + QueryTestValue val; + QueryTestValue newVal; + + do { + val = cache.get(key); + + newVal = val == null ? + new QueryTestValue(0) : new QueryTestValue(val.val1 + 1); + } + while (!cache.replace(key, val, newVal)); + } + } + finally { + if (tx != null) + tx.commit(); + } + } + } + }, threadCnt, "put-thread"); + + f.get(15, TimeUnit.SECONDS); + + GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return qryCntr.get() >= ITERATION_CNT * threadCnt * LISTENER_CNT * NODES; + } + }, 1000L); + + for (BlockingQueue> queue : rcvdEvts) + checkEvents(queue, ITERATION_CNT * threadCnt); + + assertFalse("Ordering invocations of filter broken.", fail); + } + finally { + for (QueryCursor qry : qries) + qry.close(); + + ignite(0).destroyCache(ccfg.getName()); + } + } + + /** + * @param queue Event queue. + * @throws Exception If failed. + */ + private void checkEvents(BlockingQueue> queue, int expCnt) + throws Exception { + CacheEntryEvent evt; + int cnt = 0; + Map vals = new HashMap<>(); + + while ((evt = queue.poll(100, TimeUnit.MILLISECONDS)) != null) { + assertNotNull(evt); + assertNotNull(evt.getKey()); + + Integer preVal = vals.get(evt.getKey()); + + if (preVal == null) + assertEquals(new QueryTestValue(0), evt.getValue()); + else { + if (!new QueryTestValue(preVal + 1).equals(evt.getValue())) + assertEquals("Key event: " + evt.getKey(), new QueryTestValue(preVal + 1), evt.getValue()); + } + + vals.put(evt.getKey(), evt.getValue().val1); + + ++cnt; + } + + assertEquals(expCnt, cnt); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return TimeUnit.MINUTES.toMillis(8); + } + + /** + * + */ + @IgniteAsyncCallback + private static class CacheTestRemoteFilterAsync extends CacheTestRemoteFilter { + /** + * @param cacheName Cache name. + */ + public CacheTestRemoteFilterAsync(String cacheName) { + super(cacheName); + } + } + + /** + * + */ + private static class CacheTestRemoteFilter implements + CacheEntryEventSerializableFilter { + /** */ + private Map prevVals = new ConcurrentHashMap<>(); + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + private String cacheName; + + /** + * @param cacheName Cache name. + */ + public CacheTestRemoteFilter(String cacheName) { + this.cacheName = cacheName; + } + + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent e) { + if (affinity(ignite.cache(cacheName)).isPrimary(ignite.cluster().localNode(), e.getKey())) { + QueryTestValue prevVal = prevVals.put(e.getKey(), e.getValue()); + + if (prevVal != null) { + if (!new QueryTestValue(prevVal.val1 + 1).equals(e.getValue())) + fail = true; + } + } + + return true; + } + } + + /** + * + */ + @IgniteAsyncCallback + private static class TestCacheAsyncEventListener extends TestCacheEventListener { + /** + * @param queue Queue. + * @param cntr Received events counter. + */ + public TestCacheAsyncEventListener(BlockingQueue> queue, + AtomicInteger cntr) { + super(queue, cntr); + } + } + + /** + * + */ + private static class TestCacheEventListener implements CacheEntryUpdatedListener { + /** */ + private final BlockingQueue> queue; + + /** */ + private final AtomicInteger cntr; + + /** + * @param queue Queue. + * @param cntr Received events counter. + */ + public TestCacheEventListener(BlockingQueue> queue, + AtomicInteger cntr) { + this.queue = queue; + this.cntr = cntr; + } + + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable> evts) { + for (CacheEntryEvent e : evts) { + queue.add((CacheEntryEvent)e); + + cntr.incrementAndGet(); + } + } + } + + /** + * @param cacheMode Cache mode. + * @param backups Number of backups. + * @param atomicityMode Cache atomicity mode. + * @param memoryMode Cache memory mode. + * @param writeMode Cache write mode. + * @return Cache configuration. + */ + protected CacheConfiguration cacheConfiguration( + CacheMode cacheMode, + int backups, + CacheAtomicityMode atomicityMode, + CacheMemoryMode memoryMode, + CacheWriteSynchronizationMode writeMode) { + CacheConfiguration ccfg = new CacheConfiguration<>(); + + ccfg.setName("test-cache-" + atomicityMode + "-" + cacheMode + "-" + memoryMode + "-" + memoryMode + "-" + + backups); + ccfg.setAtomicityMode(atomicityMode); + ccfg.setCacheMode(cacheMode); + ccfg.setMemoryMode(memoryMode); + ccfg.setWriteSynchronizationMode(writeMode); + ccfg.setAtomicWriteOrderMode(PRIMARY); + + if (cacheMode == PARTITIONED) + ccfg.setBackups(backups); + + return ccfg; + } + + /** + * + */ + public static class QueryTestKey implements Serializable, Comparable { + /** */ + private final Integer key; + + /** + * @param key Key. + */ + public QueryTestKey(Integer key) { + this.key = key; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + QueryTestKey that = (QueryTestKey)o; + + return key.equals(that.key); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return key.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(QueryTestKey.class, this); + } + + /** {@inheritDoc} */ + @Override public int compareTo(Object o) { + return key - ((QueryTestKey)o).key; + } + } + + /** + * + */ + public static class QueryTestValue implements Serializable { + /** */ + @GridToStringInclude + protected final Integer val1; + + /** */ + @GridToStringInclude + protected final String val2; + + /** + * @param val Value. + */ + public QueryTestValue(Integer val) { + this.val1 = val; + this.val2 = String.valueOf(val); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + QueryTestValue that = (QueryTestValue) o; + + return val1.equals(that.val1) && val2.equals(that.val2); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = val1.hashCode(); + + res = 31 * res + val2.hashCode(); + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(QueryTestValue.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java index e9fbf70..a6c33bb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java @@ -62,6 +62,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -118,6 +119,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract IgniteConfiguration cfg = super.getConfiguration(gridName); ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); cfg.setClientMode(client); @@ -598,6 +600,9 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract final List> evts = new CopyOnWriteArrayList<>(); + if (noOpFilterFactory() != null) + qry.setRemoteFilterFactory(noOpFilterFactory()); + qry.setLocalListener(new CacheEntryUpdatedListener() { @Override public void onUpdated(Iterable> events) throws CacheEntryListenerException { @@ -684,9 +689,17 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract checkSingleEvent(evts.get(7), CREATED, new QueryTestValue(5), null); checkSingleEvent(evts.get(8), EventType.UPDATED, new QueryTestValue(6), new QueryTestValue(5)); + evts.clear(); + cache.remove(key); cache.remove(key); + assert GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return evts.size() == 1; + } + }, 5_000); + evts.clear(); log.info("Finish iteration: " + i); @@ -699,6 +712,13 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract } /** + * @return No-op filter factory for batch operations. + */ + protected Factory> noOpFilterFactory() { + return null; + } + + /** * @param ccfg Cache configuration. * @throws Exception If failed. */ @@ -711,6 +731,9 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract final List> evts = new CopyOnWriteArrayList<>(); + if (noOpFilterFactory() != null) + qry.setRemoteFilterFactory(noOpFilterFactory()); + qry.setLocalListener(new CacheEntryUpdatedListener() { @Override public void onUpdated(Iterable> events) throws CacheEntryListenerException { http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java index b4c31be..e4afe73 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java @@ -60,6 +60,7 @@ public class GridTestKernalContext extends GridKernalContextImpl { null, null, null, + null, U.allPluginProviders()); GridTestUtils.setFieldValue(grid(), "cfg", config()); http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java index 30625fe..761d4bd 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java @@ -44,7 +44,6 @@ public class IgniteBinaryCacheQueryTestSuite extends TestSuite { suite.addTestSuite(BinarySerializationQuerySelfTest.class); suite.addTestSuite(BinarySerializationQueryWithReflectiveSerializerSelfTest.class); suite.addTestSuite(IgniteCacheBinaryObjectsScanSelfTest.class); - suite.addTestSuite(CacheContinuousQueryLostPartitionTest.class); //Should be adjusted. Not ready to be used with BinaryMarshaller. //suite.addTestSuite(GridCacheBinarySwapScanQuerySelfTest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java index fbb3091..e0e81b7 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java @@ -20,9 +20,14 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchAckTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchForceServerModeAckTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryAsyncFilterListenerTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryExecuteInPrimaryTest; -import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryAsyncFilterRandomOperationTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterRandomOperationTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryLostPartitionTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryOperationFromCallbackTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryOperationP2PTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryOrderingEventTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTwoNodesTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicNearEnabledSelfTest; @@ -87,12 +92,17 @@ public class IgniteCacheQuerySelfTestSuite3 extends TestSuite { suite.addTestSuite(IgniteCacheContinuousQueryClientTxReconnectTest.class); suite.addTestSuite(CacheContinuousQueryRandomOperationsTest.class); suite.addTestSuite(CacheContinuousQueryRandomOperationsTwoNodesTest.class); - suite.addTestSuite(CacheContinuousQueryFactoryFilterTest.class); suite.addTestSuite(GridCacheContinuousQueryConcurrentTest.class); + suite.addTestSuite(CacheContinuousQueryAsyncFilterListenerTest.class); + suite.addTestSuite(CacheContinuousQueryFactoryFilterRandomOperationTest.class); + suite.addTestSuite(CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.class); + suite.addTestSuite(CacheContinuousQueryOrderingEventTest.class); + suite.addTestSuite(CacheContinuousQueryOperationFromCallbackTest.class); suite.addTestSuite(CacheContinuousQueryOperationP2PTest.class); suite.addTestSuite(CacheContinuousBatchAckTest.class); suite.addTestSuite(CacheContinuousBatchForceServerModeAckTest.class); suite.addTestSuite(CacheContinuousQueryExecuteInPrimaryTest.class); + suite.addTestSuite(CacheContinuousQueryLostPartitionTest.class); return suite; } http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java index fa4e642..c4fcdac 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java @@ -18,6 +18,9 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryAsyncFailoverTxSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicReplicatedSelfTest; @@ -44,6 +47,10 @@ public class IgniteCacheQuerySelfTestSuite4 extends TestSuite { suite.addTestSuite(CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.class); suite.addTestSuite(CacheContinuousQueryFailoverTxOffheapTieredTest.class); + suite.addTestSuite(CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest.class); + suite.addTestSuite(CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest.class); + suite.addTestSuite(CacheContinuousQueryAsyncFailoverTxSelfTest.class); + return suite; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/CacheEntryEventAsyncProbe.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/CacheEntryEventAsyncProbe.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/CacheEntryEventAsyncProbe.java new file mode 100644 index 0000000..0ea66d4 --- /dev/null +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/CacheEntryEventAsyncProbe.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.yardstick.cache; + +import java.util.concurrent.atomic.AtomicLong; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryUpdatedListener; +import org.apache.ignite.lang.IgniteAsyncCallback; + +/** + * Probe which calculate continuous query events. + */ +public class CacheEntryEventAsyncProbe extends CacheEntryEventProbe { + /** */ + @Override protected CacheEntryUpdatedListener localListener(AtomicLong cntr) { + return new CacheEntryEventListener(cntr); + } + + /** + * + */ + @IgniteAsyncCallback + private static final class CacheEntryEventListener implements CacheEntryUpdatedListener { + /** */ + private AtomicLong cnt; + + /** + * @param cnt Counter. + */ + public CacheEntryEventListener(AtomicLong cnt) { + this.cnt = cnt; + } + + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable> events) + throws CacheEntryListenerException { + int size = 0; + + for (CacheEntryEvent e : events) + ++size; + + cnt.addAndGet(size); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff64c2a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/CacheEntryEventProbe.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/CacheEntryEventProbe.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/CacheEntryEventProbe.java index e42479a..a25f975 100644 --- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/CacheEntryEventProbe.java +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/CacheEntryEventProbe.java @@ -45,9 +45,6 @@ public class CacheEntryEventProbe implements BenchmarkProbe { /** */ private BenchmarkConfiguration cfg; - /** Counter. */ - private AtomicLong cnt = new AtomicLong(0); - /** Collected points. */ private Collection collected = new ArrayList<>(); @@ -67,17 +64,9 @@ public class CacheEntryEventProbe implements BenchmarkProbe { if (drv0.cache() != null) { ContinuousQuery qry = new ContinuousQuery<>(); - qry.setLocalListener(new CacheEntryUpdatedListener() { - @Override public void onUpdated(Iterable> - events) throws CacheEntryListenerException { - int size = 0; - - for (CacheEntryEvent e : events) - ++size; + final AtomicLong cnt = new AtomicLong(); - cnt.addAndGet(size); - } - }); + qry.setLocalListener(localListener(cnt)); qryCur = drv0.cache().query(qry); @@ -113,6 +102,24 @@ public class CacheEntryEventProbe implements BenchmarkProbe { + " probe. Probably, the driver doesn't provide \"cache()\" method."); } + /** + * @param cntr Received event counter. + * @return Local listener. + */ + protected CacheEntryUpdatedListener localListener(final AtomicLong cntr) { + return new CacheEntryUpdatedListener() { + @Override public void onUpdated(Iterable> events) + throws CacheEntryListenerException { + int size = 0; + + for (CacheEntryEvent e : events) + ++size; + + cntr.addAndGet(size); + } + }; + } + /** {@inheritDoc} */ @Override public void stop() throws Exception { if (qryCur != null) {