Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 605142009D9 for ; Thu, 19 May 2016 11:37:38 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 5F32B160A2A; Thu, 19 May 2016 09:37:38 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 35635160A04 for ; Thu, 19 May 2016 11:37:35 +0200 (CEST) Received: (qmail 38492 invoked by uid 500); 19 May 2016 09:37:34 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 38423 invoked by uid 99); 19 May 2016 09:37:34 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 May 2016 09:37:34 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3BDC9DFFD9; Thu, 19 May 2016 09:37:34 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Thu, 19 May 2016 09:37:36 -0000 Message-Id: In-Reply-To: <5f92691f09ae4a6b98f1a9e14dc4ff17@git.apache.org> References: <5f92691f09ae4a6b98f1a9e14dc4ff17@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [03/43] ignite git commit: IGNITE-2004 Fixed "Asynchronous execution of ContinuousQuery's remote filter & local list". archived-at: Thu, 19 May 2016 09:37:38 -0000 http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java new file mode 100644 index 0000000..0605bc8 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java @@ -0,0 +1,986 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.io.Serializable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import javax.cache.configuration.FactoryBuilder; +import javax.cache.configuration.MutableCacheEntryListenerConfiguration; +import javax.cache.event.CacheEntryCreatedListener; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryUpdatedListener; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheEntryEventSerializableFilter; +import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.cache.CacheMemoryMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteAsyncCallback; +import org.apache.ignite.lang.IgniteBiInClosure; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED; +import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES; +import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** + * + */ +public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int NODES = 5; + + /** */ + public static final int ITERATION_CNT = 100; + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + cfg.setClientMode(client); + + MemoryEventStorageSpi storeSpi = new MemoryEventStorageSpi(); + storeSpi.setExpireCount(1000); + + cfg.setEventStorageSpi(storeSpi); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(NODES - 1); + + client = true; + + startGrid(NODES - 1); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /// + /// ASYNC FILTER AND LISTENER. TEST LISTENER. + /// + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListenerTx() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListenerTxJCacheApi() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, true); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListenerTxOffHeap() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListenerTxOffHeapJCacheApi() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_TIERED), true, true, true); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListenerTxOffHeapValues() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_VALUES), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListenerAtomic() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListenerAtomicJCacheApi() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED), true, true, true); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListenerReplicatedAtomic() throws Exception { + testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListenerReplicatedAtomicJCacheApi() throws Exception { + testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED), true, true, true); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListenerReplicatedAtomicOffHeapValues() throws Exception { + testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListenerAtomicOffHeap() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListenerAtomicOffHeapValues() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListenerAtomicWithoutBackup() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListenerAtomicWithoutBackupJCacheApi() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED), true, true, true); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListener() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListenerReplicated() throws Exception { + testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListenerReplicatedJCacheApi() throws Exception { + testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, true); + } + + /// + /// ASYNC FILTER AND LISTENER. TEST FILTER. + /// + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterTx() throws Exception { + testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterTxJCacheApi() throws Exception { + testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, true); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterTxOffHeap() throws Exception { + testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterTxOffHeapJCacheApi() throws Exception { + testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_TIERED), true, true, true); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterTxOffHeapValues() throws Exception { + testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_VALUES), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterAtomic() throws Exception { + testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterAtomicJCacheApi() throws Exception { + testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED), true, true, true); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterReplicatedAtomic() throws Exception { + testNonDeadLockInFilter(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterAtomicOffHeap() throws Exception { + testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterAtomicOffHeapJCacheApi() throws Exception { + testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), true, true, true); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterAtomicOffHeapValues() throws Exception { + testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterAtomicWithoutBackup() throws Exception { + testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilter() throws Exception { + testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterReplicated() throws Exception { + testNonDeadLockInFilter(cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterReplicatedJCacheApi() throws Exception { + testNonDeadLockInFilter(cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, false); + } + + /// + /// ASYNC LISTENER. TEST LISTENER. + /// + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterTxSyncFilter() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), false, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterTxOffHeapSyncFilter() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_TIERED), false, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterTxOffHeapValuesSyncFilter() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_VALUES), false, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterAtomicSyncFilter() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED), false, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterReplicatedAtomicSyncFilter() throws Exception { + testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED), false, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterAtomicOffHeapSyncFilter() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), false, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterAtomicOffHeapValuesSyncFilter() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), false, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterAtomicWithoutBackupSyncFilter() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED), false, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterSyncFilter() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), false, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterReplicatedSyncFilter() throws Exception { + testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, ONHEAP_TIERED), false, true, false); + } + + /** + * @param ccfg Cache configuration. + * @param asyncFltr Async filter. + * @param asyncLsnr Async listener. + * @param jcacheApi Use JCache api for registration entry update listener. + * @throws Exception If failed. + */ + private void testNonDeadLockInListener(CacheConfiguration ccfg, + final boolean asyncFltr, + boolean asyncLsnr, + boolean jcacheApi) throws Exception { + ignite(0).createCache(ccfg); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + try { + for (int i = 0; i < ITERATION_CNT; i++) { + log.info("Start iteration: " + i); + + int nodeIdx = i % NODES; + + final IgniteCache cache = grid(nodeIdx).cache(ccfg.getName()); + + final QueryTestKey key = NODES - 1 != nodeIdx ? affinityKey(cache) : new QueryTestKey(1); + + final QueryTestValue val0 = new QueryTestValue(1); + final QueryTestValue newVal = new QueryTestValue(2); + + final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch evtFromLsnrLatch = new CountDownLatch(1); + + IgniteBiInClosure> fltrClsr = + new IgniteBiInClosure>() { + @Override public void apply(Ignite ignite, CacheEntryEvent e) { + if (asyncFltr) { + assertFalse("Failed: " + Thread.currentThread().getName(), + Thread.currentThread().getName().contains("sys-")); + + assertTrue("Failed: " + Thread.currentThread().getName(), + Thread.currentThread().getName().contains("callback-")); + } + } + }; + + IgniteBiInClosure> lsnrClsr = + new IgniteBiInClosure>() { + @Override public void apply(Ignite ignite, CacheEntryEvent e) { + IgniteCache cache0 = ignite.cache(cache.getName()); + + QueryTestValue val = e.getValue(); + + if (val == null) + return; + else if (val.equals(newVal)) { + evtFromLsnrLatch.countDown(); + + return; + } + else if (!val.equals(val0)) + return; + + Transaction tx = null; + + try { + if (cache0.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL) + tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ); + + assertEquals(val, val0); + + cache0.put(key, newVal); + + if (tx != null) + tx.commit(); + + latch.countDown(); + } + catch (Exception exp) { + log.error("Failed: ", exp); + + throw new IgniteException(exp); + } + finally { + if (tx != null) + tx.close(); + } + } + }; + + QueryCursor qry = null; + MutableCacheEntryListenerConfiguration lsnrCfg = null; + + CacheInvokeListener locLsnr = asyncLsnr ? new CacheInvokeListenerAsync(lsnrClsr) + : new CacheInvokeListener(lsnrClsr); + + CacheEntryEventSerializableFilter rmtFltr = asyncFltr ? + new CacheTestRemoteFilterAsync(fltrClsr) : new CacheTestRemoteFilter(fltrClsr); + + if (jcacheApi) { + lsnrCfg = new MutableCacheEntryListenerConfiguration<>( + FactoryBuilder.factoryOf(locLsnr), + FactoryBuilder.factoryOf(rmtFltr), + true, + false + ); + + cache.registerCacheEntryListener(lsnrCfg); + } + else { + ContinuousQuery conQry = new ContinuousQuery<>(); + + conQry.setLocalListener(locLsnr); + + conQry.setRemoteFilterFactory(FactoryBuilder.factoryOf(rmtFltr)); + + qry = cache.query(conQry); + } + + try { + if (rnd.nextBoolean()) + cache.put(key, val0); + else { + cache.invoke(key, new CacheEntryProcessor() { + @Override public Object process(MutableEntry entry, Object... arguments) + throws EntryProcessorException { + entry.setValue(val0); + + return null; + } + }); + } + + assertTrue("Failed to waiting event.", U.await(latch, 3, SECONDS)); + + assertEquals(cache.get(key), new QueryTestValue(2)); + + assertTrue("Failed to waiting event from listener.", U.await(latch, 3, SECONDS)); + } + finally { + if (qry != null) + qry.close(); + + if (lsnrCfg != null) + cache.deregisterCacheEntryListener(lsnrCfg); + } + + log.info("Iteration finished: " + i); + } + } + finally { + ignite(0).destroyCache(ccfg.getName()); + } + } + + /** + * @param ccfg Cache configuration. + * @param asyncFilter Async filter. + * @param asyncLsnr Async listener. + * @param jcacheApi Use JCache api for start update listener. + * @throws Exception If failed. + */ + private void testNonDeadLockInFilter(CacheConfiguration ccfg, + final boolean asyncFilter, + final boolean asyncLsnr, + boolean jcacheApi) throws Exception { + ignite(0).createCache(ccfg); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + try { + for (int i = 0; i < ITERATION_CNT; i++) { + log.info("Start iteration: " + i); + + int nodeIdx = i % NODES; + + final IgniteCache cache = grid(nodeIdx).cache(ccfg.getName()); + + final QueryTestKey key = NODES - 1 != nodeIdx ? affinityKey(cache) : new QueryTestKey(1); + + final QueryTestValue val0 = new QueryTestValue(1); + final QueryTestValue newVal = new QueryTestValue(2); + + final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch evtFromLsnrLatch = new CountDownLatch(1); + + IgniteBiInClosure> fltrClsr = + new IgniteBiInClosure>() { + @Override public void apply(Ignite ignite, CacheEntryEvent e) { + if (asyncFilter) { + assertFalse("Failed: " + Thread.currentThread().getName(), + Thread.currentThread().getName().contains("sys-")); + + assertTrue("Failed: " + Thread.currentThread().getName(), + Thread.currentThread().getName().contains("callback-")); + } + + IgniteCache cache0 = ignite.cache(cache.getName()); + + QueryTestValue val = e.getValue(); + + if (val == null) + return; + else if (val.equals(newVal)) { + evtFromLsnrLatch.countDown(); + + return; + } + else if (!val.equals(val0)) + return; + + Transaction tx = null; + + try { + if (cache0.getConfiguration(CacheConfiguration.class) + .getAtomicityMode() == TRANSACTIONAL) + tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ); + + assertEquals(val, val0); + + cache0.put(key, newVal); + + if (tx != null) + tx.commit(); + + latch.countDown(); + } + catch (Exception exp) { + log.error("Failed: ", exp); + + throw new IgniteException(exp); + } + finally { + if (tx != null) + tx.close(); + } + } + }; + + IgniteBiInClosure> lsnrClsr = + new IgniteBiInClosure>() { + @Override public void apply(Ignite ignite, CacheEntryEvent e) { + if (asyncLsnr) { + assertFalse("Failed: " + Thread.currentThread().getName(), + Thread.currentThread().getName().contains("sys-")); + + assertTrue("Failed: " + Thread.currentThread().getName(), + Thread.currentThread().getName().contains("callback-")); + } + + QueryTestValue val = e.getValue(); + + if (val == null || !val.equals(new QueryTestValue(1))) + return; + + assertEquals(val, val0); + + latch.countDown(); + } + }; + + + QueryCursor qry = null; + MutableCacheEntryListenerConfiguration lsnrCfg = null; + + CacheInvokeListener locLsnr = asyncLsnr ? new CacheInvokeListenerAsync(lsnrClsr) + : new CacheInvokeListener(lsnrClsr); + + CacheEntryEventSerializableFilter rmtFltr = asyncFilter ? + new CacheTestRemoteFilterAsync(fltrClsr) : new CacheTestRemoteFilter(fltrClsr); + + if (jcacheApi) { + lsnrCfg = new MutableCacheEntryListenerConfiguration<>( + FactoryBuilder.factoryOf(locLsnr), + FactoryBuilder.factoryOf(rmtFltr), + true, + false + ); + + cache.registerCacheEntryListener(lsnrCfg); + } + else { + ContinuousQuery conQry = new ContinuousQuery<>(); + + conQry.setLocalListener(locLsnr); + + conQry.setRemoteFilterFactory(FactoryBuilder.factoryOf(rmtFltr)); + + qry = cache.query(conQry); + } + + try { + if (rnd.nextBoolean()) + cache.put(key, val0); + else + cache.invoke(key, new CacheEntryProcessor() { + @Override public Object process(MutableEntry entry, Object... arguments) + throws EntryProcessorException { + entry.setValue(val0); + + return null; + } + }); + + assert U.await(latch, 3, SECONDS) : "Failed to waiting event."; + + assertEquals(cache.get(key), new QueryTestValue(2)); + + assertTrue("Failed to waiting event from filter.", U.await(latch, 3, SECONDS)); + } + finally { + if (qry != null) + qry.close(); + + if (lsnrCfg != null) + cache.deregisterCacheEntryListener(lsnrCfg); + } + + log.info("Iteration finished: " + i); + } + } + finally { + ignite(0).destroyCache(ccfg.getName()); + } + } + + /** + * @param cache Ignite cache. + * @return Key. + */ + private QueryTestKey affinityKey(IgniteCache cache) { + Affinity aff = affinity(cache); + + for (int i = 0; i < 10_000; i++) { + QueryTestKey key = new QueryTestKey(i); + + if (aff.isPrimary(localNode(cache), key)) + return key; + } + + throw new IgniteException("Failed to found primary key."); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return TimeUnit.SECONDS.toMillis(15); + } + + /** + * + */ + @IgniteAsyncCallback + private static class CacheTestRemoteFilterAsync extends CacheTestRemoteFilter { + /** + * @param clsr Closure. + */ + public CacheTestRemoteFilterAsync( + IgniteBiInClosure> clsr) { + super(clsr); + } + } + + /** + * + */ + private static class CacheTestRemoteFilter implements + CacheEntryEventSerializableFilter { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + private IgniteBiInClosure> clsr; + + /** + * @param clsr Closure. + */ + public CacheTestRemoteFilter(IgniteBiInClosure> clsr) { + this.clsr = clsr; + } + + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent e) + throws CacheEntryListenerException { + clsr.apply(ignite, e); + + return true; + } + } + + /** + * + */ + @IgniteAsyncCallback + private static class CacheInvokeListenerAsync extends CacheInvokeListener { + /** + * @param clsr Closure. + */ + public CacheInvokeListenerAsync( + IgniteBiInClosure> clsr) { + super(clsr); + } + } + + /** + * + */ + private static class CacheInvokeListener implements CacheEntryUpdatedListener, + CacheEntryCreatedListener, Serializable { + @IgniteInstanceResource + private Ignite ignite; + + /** */ + private IgniteBiInClosure> clsr; + + /** + * @param clsr Closure. + */ + public CacheInvokeListener(IgniteBiInClosure> clsr) { + this.clsr = clsr; + } + + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable> events) + throws CacheEntryListenerException { + for (CacheEntryEvent e : events) + clsr.apply(ignite, e); + } + + /** {@inheritDoc} */ + @Override public void onCreated(Iterable> events) throws CacheEntryListenerException { + for (CacheEntryEvent e : events) + clsr.apply(ignite, e); + } + } + + /** + * @param cacheMode Cache mode. + * @param backups Number of backups. + * @param atomicityMode Cache atomicity mode. + * @param memoryMode Cache memory mode. + * @return Cache configuration. + */ + protected CacheConfiguration cacheConfiguration( + CacheMode cacheMode, + int backups, + CacheAtomicityMode atomicityMode, + CacheMemoryMode memoryMode) { + CacheConfiguration ccfg = new CacheConfiguration<>(); + + ccfg.setName("test-cache-" + atomicityMode + "-" + cacheMode + "-" + memoryMode + "-" + backups); + ccfg.setAtomicityMode(atomicityMode); + ccfg.setCacheMode(cacheMode); + ccfg.setMemoryMode(memoryMode); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicWriteOrderMode(PRIMARY); + + if (cacheMode == PARTITIONED) + ccfg.setBackups(backups); + + return ccfg; + } + + /** + * + */ + public static class QueryTestKey implements Serializable, Comparable { + /** */ + private final Integer key; + + /** + * @param key Key. + */ + public QueryTestKey(Integer key) { + this.key = key; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + QueryTestKey that = (QueryTestKey)o; + + return key.equals(that.key); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return key.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(QueryTestKey.class, this); + } + + /** {@inheritDoc} */ + @Override public int compareTo(Object o) { + return key - ((QueryTestKey)o).key; + } + } + + /** + * + */ + public static class QueryTestValue implements Serializable { + /** */ + @GridToStringInclude + protected final Integer val1; + + /** */ + @GridToStringInclude + protected final String val2; + + /** + * @param val Value. + */ + public QueryTestValue(Integer val) { + this.val1 = val; + this.val2 = String.valueOf(val); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + QueryTestValue that = (QueryTestValue)o; + + return val1.equals(that.val1) && val2.equals(that.val2); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = val1.hashCode(); + + res = 31 * res + val2.hashCode(); + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(QueryTestValue.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java new file mode 100644 index 0000000..928cfda --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import javax.cache.configuration.Factory; +import javax.cache.configuration.FactoryBuilder; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryEventFilter; +import org.apache.ignite.cache.CacheEntryEventSerializableFilter; +import org.apache.ignite.lang.IgniteAsyncCallback; +import org.jetbrains.annotations.NotNull; + +/** + * + */ +public class CacheContinuousQueryFactoryAsyncFilterRandomOperationTest + extends CacheContinuousQueryFactoryFilterRandomOperationTest { + /** {@inheritDoc} */ + @NotNull @Override protected Factory> + createFilterFactory() { + return new AsyncFilterFactory(); + } + + /** + * + */ + @IgniteAsyncCallback + protected static class NonSerializableAsyncFilter implements + CacheEntryEventSerializableFilter, Externalizable { + /** */ + public NonSerializableAsyncFilter() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent evt) { + assertTrue("Failed. Current thread name: " + Thread.currentThread().getName(), + Thread.currentThread().getName().contains("callback-")); + + assertFalse("Failed. Current thread name: " + Thread.currentThread().getName(), + Thread.currentThread().getName().contains("sys-")); + + return isAccepted(evt.getValue()); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + fail("Entry filter should not be marshaled."); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + fail("Entry filter should not be marshaled."); + } + + /** + * @param val Value. + * @return {@code True} if value is even. + */ + public static boolean isAccepted(QueryTestValue val) { + return val == null || val.val1 % 2 == 0; + } + } + + /** + * + */ + protected static class AsyncFilterFactory implements Factory { + /** {@inheritDoc} */ + @Override public NonSerializableAsyncFilter create() { + return new NonSerializableAsyncFilter(); + } + } + + /** {@inheritDoc} */ + @Override protected Factory> noOpFilterFactory() { + return FactoryBuilder.factoryOf(NoopAsyncFilter.class); + } + + /** + * + */ + @IgniteAsyncCallback + protected static class NoopAsyncFilter implements + CacheEntryEventSerializableFilter, Externalizable { + /** */ + public NoopAsyncFilter() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent evt) { + assertTrue("Failed. Current thread name: " + Thread.currentThread().getName(), + Thread.currentThread().getName().contains("callback-")); + + assertFalse("Failed. Current thread name: " + Thread.currentThread().getName(), + Thread.currentThread().getName().contains("sys-")); + + return true; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + // No-op. + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java new file mode 100644 index 0000000..1178086 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java @@ -0,0 +1,725 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import javax.cache.configuration.CacheEntryListenerConfiguration; +import javax.cache.configuration.Factory; +import javax.cache.configuration.FactoryBuilder; +import javax.cache.configuration.MutableCacheEntryListenerConfiguration; +import javax.cache.event.CacheEntryCreatedListener; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryEventFilter; +import javax.cache.event.CacheEntryExpiredListener; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryRemovedListener; +import javax.cache.event.CacheEntryUpdatedListener; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheEntryEventSerializableFilter; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.query.CacheQueryEntryEvent; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; +import org.jetbrains.annotations.NotNull; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterRandomOperationTest.NonSerializableFilter.isAccepted; +import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.CLIENT; +import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.SERVER; +import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; +import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; + +/** + * + */ +public class CacheContinuousQueryFactoryFilterRandomOperationTest extends CacheContinuousQueryRandomOperationsTest { + /** */ + private static final int NODES = 5; + + /** */ + private static final int KEYS = 50; + + /** */ + private static final int VALS = 10; + + /** */ + public static final int ITERATION_CNT = 40; + + /** + * @throws Exception If failed. + */ + public void testInternalQuery() throws Exception { + CacheConfiguration ccfg = cacheConfiguration(REPLICATED, + 1, + ATOMIC, + ONHEAP_TIERED, + false); + + final IgniteCache cache = grid(0).getOrCreateCache(ccfg); + + UUID uuid = null; + + try { + for (int i = 0; i < 10; i++) + cache.put(i, i); + + final CountDownLatch latch = new CountDownLatch(5); + + CacheEntryUpdatedListener lsnr = new CacheEntryUpdatedListener() { + @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException { + for (Object evt : iterable) { + latch.countDown(); + + log.info("Received event: " + evt); + } + } + }; + + uuid = grid(0).context().cache().cache(cache.getName()).context().continuousQueries() + .executeInternalQuery(lsnr, new SerializableFilter(), false, true, true); + + for (int i = 10; i < 20; i++) + cache.put(i, i); + + assertTrue(latch.await(3, SECONDS)); + } + finally { + if (uuid != null) + grid(0).context().cache().cache(cache.getName()).context().continuousQueries() + .cancelInternalQuery(uuid); + + cache.destroy(); + } + } + + /** {@inheritDoc} */ + @Override protected void doTestContinuousQuery(CacheConfiguration ccfg, ContinuousDeploy deploy) + throws Exception { + ignite(0).createCache(ccfg); + + try { + long seed = System.currentTimeMillis(); + + Random rnd = new Random(seed); + + log.info("Random seed: " + seed); + + List>> evtsQueues = new ArrayList<>(); + + Collection> curs = new ArrayList<>(); + + Collection> lsnrCfgs = new ArrayList<>(); + + if (deploy == CLIENT) + evtsQueues.add(registerListener(ccfg.getName(), NODES - 1, curs, lsnrCfgs, rnd.nextBoolean())); + else if (deploy == SERVER) + evtsQueues.add(registerListener(ccfg.getName(), rnd.nextInt(NODES - 1), curs, lsnrCfgs, + rnd.nextBoolean())); + else { + boolean isSync = rnd.nextBoolean(); + + for (int i = 0; i < NODES - 1; i++) + evtsQueues.add(registerListener(ccfg.getName(), i, curs, lsnrCfgs, isSync)); + } + + ConcurrentMap expData = new ConcurrentHashMap<>(); + + Map partCntr = new ConcurrentHashMap<>(); + + try { + for (int i = 0; i < ITERATION_CNT; i++) { + if (i % 10 == 0) + log.info("Iteration: " + i); + + for (int idx = 0; idx < NODES; idx++) + randomUpdate(rnd, evtsQueues, expData, partCntr, grid(idx).cache(ccfg.getName())); + } + } + finally { + for (QueryCursor cur : curs) + cur.close(); + + for (T2 e : lsnrCfgs) + grid(e.get1()).cache(ccfg.getName()).deregisterCacheEntryListener(e.get2()); + } + } + finally { + ignite(0).destroyCache(ccfg.getName()); + } + } + + /** + * @param cacheName Cache name. + * @param nodeIdx Node index. + * @param curs Cursors. + * @param lsnrCfgs Listener configurations. + * @return Event queue + */ + private BlockingQueue> registerListener(String cacheName, + int nodeIdx, + Collection> curs, + Collection> lsnrCfgs, + boolean sync) { + final BlockingQueue> evtsQueue = new ArrayBlockingQueue<>(50_000); + + if (ThreadLocalRandom.current().nextBoolean()) { + MutableCacheEntryListenerConfiguration lsnrCfg = + new MutableCacheEntryListenerConfiguration<>( + FactoryBuilder.factoryOf(new LocalNonSerialiseListener() { + @Override protected void onEvents(Iterable> evts) { + for (CacheEntryEvent evt : evts) + evtsQueue.add(evt); + } + }), + createFilterFactory(), + true, + sync + ); + + grid(nodeIdx).cache(cacheName).registerCacheEntryListener((CacheEntryListenerConfiguration)lsnrCfg); + + lsnrCfgs.add(new T2(nodeIdx, lsnrCfg)); + } + else { + ContinuousQuery qry = new ContinuousQuery<>(); + + qry.setLocalListener(new CacheEntryUpdatedListener() { + @Override public void onUpdated(Iterable> evts) throws CacheEntryListenerException { + for (CacheEntryEvent evt : evts) + evtsQueue.add(evt); + } + }); + + qry.setRemoteFilterFactory(createFilterFactory()); + + QueryCursor cur = grid(nodeIdx).cache(cacheName).query(qry); + + curs.add(cur); + } + + return evtsQueue; + } + + /** + * @return Filter factory. + */ + @NotNull protected Factory> createFilterFactory() { + return new FilterFactory(); + } + + /** + * @param rnd Random generator. + * @param evtsQueues Events queue. + * @param expData Expected cache data. + * @param partCntr Partition counter. + * @param cache Cache. + * @throws Exception If failed. + */ + private void randomUpdate( + Random rnd, + List>> evtsQueues, + ConcurrentMap expData, + Map partCntr, + IgniteCache cache) + throws Exception { + Object key = new QueryTestKey(rnd.nextInt(KEYS)); + Object newVal = value(rnd); + Object oldVal = expData.get(key); + + int op = rnd.nextInt(11); + + Ignite ignite = cache.unwrap(Ignite.class); + + Transaction tx = null; + + if (cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL && rnd.nextBoolean()) + tx = ignite.transactions().txStart(txRandomConcurrency(rnd), txRandomIsolation(rnd)); + + try { + // log.info("Random operation [key=" + key + ", op=" + op + ']'); + + switch (op) { + case 0: { + cache.put(key, newVal); + + if (tx != null) + tx.commit(); + + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); + + expData.put(key, newVal); + + break; + } + + case 1: { + cache.getAndPut(key, newVal); + + if (tx != null) + tx.commit(); + + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); + + expData.put(key, newVal); + + break; + } + + case 2: { + cache.remove(key); + + if (tx != null) + tx.commit(); + + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal); + + expData.remove(key); + + break; + } + + case 3: { + cache.getAndRemove(key); + + if (tx != null) + tx.commit(); + + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal); + + expData.remove(key); + + break; + } + + case 4: { + cache.invoke(key, new EntrySetValueProcessor(newVal, rnd.nextBoolean())); + + if (tx != null) + tx.commit(); + + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); + + expData.put(key, newVal); + + break; + } + + case 5: { + cache.invoke(key, new EntrySetValueProcessor(null, rnd.nextBoolean())); + + if (tx != null) + tx.commit(); + + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal); + + expData.remove(key); + + break; + } + + case 6: { + cache.putIfAbsent(key, newVal); + + if (tx != null) + tx.commit(); + + if (oldVal == null) { + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, null); + + expData.put(key, newVal); + } + else + checkNoEvent(evtsQueues); + + break; + } + + case 7: { + cache.getAndPutIfAbsent(key, newVal); + + if (tx != null) + tx.commit(); + + if (oldVal == null) { + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, null); + + expData.put(key, newVal); + } + else + checkNoEvent(evtsQueues); + + break; + } + + case 8: { + cache.replace(key, newVal); + + if (tx != null) + tx.commit(); + + if (oldVal != null) { + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); + + expData.put(key, newVal); + } + else + checkNoEvent(evtsQueues); + + break; + } + + case 9: { + cache.getAndReplace(key, newVal); + + if (tx != null) + tx.commit(); + + if (oldVal != null) { + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); + + expData.put(key, newVal); + } + else + checkNoEvent(evtsQueues); + + break; + } + + case 10: { + if (oldVal != null) { + Object replaceVal = value(rnd); + + boolean success = replaceVal.equals(oldVal); + + if (success) { + cache.replace(key, replaceVal, newVal); + + if (tx != null) + tx.commit(); + + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); + + expData.put(key, newVal); + } + else { + cache.replace(key, replaceVal, newVal); + + if (tx != null) + tx.commit(); + + checkNoEvent(evtsQueues); + } + } + else { + cache.replace(key, value(rnd), newVal); + + if (tx != null) + tx.commit(); + + checkNoEvent(evtsQueues); + } + + break; + } + + default: + fail("Op:" + op); + } + } + finally { + if (tx != null) + tx.close(); + } + } + + /** + * @param rnd {@link Random}. + * @return {@link TransactionIsolation}. + */ + private TransactionIsolation txRandomIsolation(Random rnd) { + int val = rnd.nextInt(3); + + if (val == 0) + return READ_COMMITTED; + else if (val == 1) + return REPEATABLE_READ; + else + return SERIALIZABLE; + } + + /** + * @param rnd {@link Random}. + * @return {@link TransactionConcurrency}. + */ + private TransactionConcurrency txRandomConcurrency(Random rnd) { + return rnd.nextBoolean() ? TransactionConcurrency.OPTIMISTIC : TransactionConcurrency.PESSIMISTIC; + } + + /** + * @param cache Cache. + * @param key Key + * @param cntrs Partition counters. + */ + private void updatePartitionCounter(IgniteCache cache, Object key, Map cntrs) { + Affinity aff = cache.unwrap(Ignite.class).affinity(cache.getName()); + + int part = aff.partition(key); + + Long partCntr = cntrs.get(part); + + if (partCntr == null) + partCntr = 0L; + + cntrs.put(part, ++partCntr); + } + + /** + * @param rnd Random generator. + * @return Cache value. + */ + private static Object value(Random rnd) { + return new QueryTestValue(rnd.nextInt(VALS)); + } + + /** + * @param evtsQueues Event queue. + * @param partCntrs Partition counters. + * @param aff Affinity function. + * @param key Key. + * @param val Value. + * @param oldVal Old value. + * @throws Exception If failed. + */ + private void waitAndCheckEvent(List>> evtsQueues, + Map partCntrs, + Affinity aff, + Object key, + Object val, + Object oldVal) + throws Exception { + if ((val == null && oldVal == null + || (val != null && !isAccepted((QueryTestValue)val)))) { + checkNoEvent(evtsQueues); + + return; + } + + for (BlockingQueue> evtsQueue : evtsQueues) { + CacheEntryEvent evt = evtsQueue.poll(5, SECONDS); + + assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal=" + oldVal + ']', evt); + assertEquals(key, evt.getKey()); + assertEquals(val, evt.getValue()); + assertEquals(oldVal, evt.getOldValue()); + + long cntr = partCntrs.get(aff.partition(key)); + CacheQueryEntryEvent qryEntryEvt = evt.unwrap(CacheQueryEntryEvent.class); + + assertNotNull(cntr); + assertNotNull(qryEntryEvt); + + assertEquals(cntr, qryEntryEvt.getPartitionUpdateCounter()); + } + } + + /** + * @param evtsQueues Event queue. + * @throws Exception If failed. + */ + private void checkNoEvent(List>> evtsQueues) throws Exception { + for (BlockingQueue> evtsQueue : evtsQueues) { + CacheEntryEvent evt = evtsQueue.poll(50, MILLISECONDS); + + assertNull(evt); + } + } + + /** + * + */ + protected static class NonSerializableFilter + implements CacheEntryEventSerializableFilter, Externalizable { + /** */ + public NonSerializableFilter() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent evt) { + return isAccepted(evt.getValue()); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + fail("Entry filter should not be marshaled."); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + fail("Entry filter should not be marshaled."); + } + + /** + * @param val Value. + * @return {@code True} if value is even. + */ + public static boolean isAccepted(QueryTestValue val) { + return val == null || val.val1 % 2 == 0; + } + } + + /** + * + */ + protected static class SerializableFilter implements CacheEntryEventSerializableFilter { + /** */ + public SerializableFilter() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent evt) + throws CacheEntryListenerException { + return isAccepted(evt.getValue()); + } + + /** + * @return {@code True} if value is even. + */ + public static boolean isAccepted(Integer val) { + return val == null || val % 2 == 0; + } + } + + /** + * + */ + protected static class FilterFactory implements Factory { + /** {@inheritDoc} */ + @Override public NonSerializableFilter create() { + return new NonSerializableFilter(); + } + } + + /** + * + */ + public abstract class LocalNonSerialiseListener implements + CacheEntryUpdatedListener, + CacheEntryCreatedListener, + CacheEntryExpiredListener, + CacheEntryRemovedListener, + Externalizable { + /** */ + public LocalNonSerialiseListener() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onCreated(Iterable> evts) throws CacheEntryListenerException { + onEvents(evts); + } + + /** {@inheritDoc} */ + @Override public void onExpired(Iterable> evts) throws CacheEntryListenerException { + onEvents(evts); + } + + /** {@inheritDoc} */ + @Override public void onRemoved(Iterable> evts) throws CacheEntryListenerException { + onEvents(evts); + } + + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable> evts) throws CacheEntryListenerException { + onEvents(evts); + } + + /** + * @param evts Events. + */ + protected abstract void onEvents(Iterable> evts); + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + throw new UnsupportedOperationException("Failed. Listener should not be marshaled."); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + throw new UnsupportedOperationException("Failed. Listener should not be unmarshaled."); + } + } +}