Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BA66B200B0F for ; Fri, 17 Jun 2016 14:38:32 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B8557160A84; Fri, 17 Jun 2016 12:38:32 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 731C5160A61 for ; Fri, 17 Jun 2016 14:38:31 +0200 (CEST) Received: (qmail 56303 invoked by uid 500); 17 Jun 2016 12:38:30 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 56277 invoked by uid 99); 17 Jun 2016 12:38:30 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Jun 2016 12:38:30 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 73B38DFC74; Fri, 17 Jun 2016 12:38:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Fri, 17 Jun 2016 12:38:32 -0000 Message-Id: <833c804857a6441cab717d0fdb848edb@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [03/43] ignite git commit: ignite-114 Load value from store for cache 'invoke' archived-at: Fri, 17 Jun 2016 12:38:32 -0000 http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheReadThroughStoreCallTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheReadThroughStoreCallTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheReadThroughStoreCallTest.java new file mode 100644 index 0000000..bb092d4 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheReadThroughStoreCallTest.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.processor.EntryProcessor; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.cache.store.CacheStoreAdapter; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.lang.IgniteBiInClosure; +import org.apache.ignite.lang.IgniteRunnable; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; +import org.jetbrains.annotations.Nullable; +import org.jsr166.ConcurrentHashMap8; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.transactions.TransactionIsolation.values; + +/** + * + */ +public class IgniteCacheReadThroughStoreCallTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final Map storeMap = new ConcurrentHashMap8<>(); + + /** */ + protected boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + storeMap.clear(); + + super.afterTest(); + } + + /** + * @throws Exception If failed. + */ + public void testMultiNode() throws Exception { + startGridsMultiThreaded(4); + + client = true; + + startGrid(4); + + checkLoadCount(cacheConfiguration(PARTITIONED, ATOMIC, 0)); + + checkLoadCount(cacheConfiguration(PARTITIONED, ATOMIC, 1)); + + checkLoadCount(cacheConfiguration(PARTITIONED, ATOMIC, 2)); + + checkLoadCount(cacheConfiguration(PARTITIONED, TRANSACTIONAL, 0)); + + checkLoadCount(cacheConfiguration(PARTITIONED, TRANSACTIONAL, 1)); + + checkLoadCount(cacheConfiguration(PARTITIONED, TRANSACTIONAL, 2)); + } + + /** + * @param ccfg Cache configuration. + * @throws Exception If failed. + */ + private void checkLoadCount(CacheConfiguration ccfg) throws Exception { + storeMap.clear(); + + Ignite ignite0 = ignite(0); + + ignite0.createCache(ccfg); + + try { + int key = 0; + + for (Ignite node : G.allGrids()) { + log.info("Test for node: " + node.name()); + + final IgniteCache cache = node.cache(ccfg.getName()); + + for (int i = 0; i < 50; i++) { + final int k = key++; + + checkReadThrough(cache, new IgniteRunnable() { + @Override public void run() { + cache.invoke(k, new TestEntryProcessor()); + } + }, null, null, 1); + } + + for (int i = 0; i < 50; i++) { + final int k = key++; + + checkReadThrough(cache, new IgniteRunnable() { + @Override public void run() { + cache.put(k, k); + } + }, null, null, 0); + } + + if (ccfg.getAtomicityMode() == TRANSACTIONAL) { + for (TransactionConcurrency concurrency : TransactionConcurrency.values()) { + for (TransactionIsolation isolation : values()) { + log.info("Test tx [concurrency=" + concurrency + ", isolation=" + isolation + ']'); + + for (int i = 0; i < 50; i++) { + final int k = key++; + + checkReadThrough(cache, new IgniteRunnable() { + @Override public void run() { + cache.invoke(k, new TestEntryProcessor()); + } + }, concurrency, isolation, 2); + } + } + } + } + } + + ignite0.cache(ccfg.getName()).removeAll(); + } + finally { + ignite0.destroyCache(ccfg.getName()); + } + } + + /** + * @param cache Cache. + * @param c Cache operation Closure. + * @param concurrency Transaction concurrency. + * @param isolation Transaction isolation. + * @param expLoadCnt Expected number of store 'load' calls. + * @throws Exception If failed. + */ + private void checkReadThrough(IgniteCache cache, + IgniteRunnable c, + @Nullable TransactionConcurrency concurrency, + @Nullable TransactionIsolation isolation, + int expLoadCnt) throws Exception { + TestStore.loadCnt.set(0); + + Transaction tx = isolation != null ? cache.unwrap(Ignite.class).transactions().txStart(concurrency, isolation) + : null; + + try { + c.run(); + + if (tx != null) + tx.commit(); + } + finally { + if (tx != null) + tx.close(); + } + + assertEquals(expLoadCnt, TestStore.loadCnt.get()); + } + + /** + * @param cacheMode Cache mode. + * @param atomicityMode Atomicity mode. + * @param backups Number of backups. + * @return Cache configuration. + */ + @SuppressWarnings("unchecked") + protected CacheConfiguration cacheConfiguration(CacheMode cacheMode, + CacheAtomicityMode atomicityMode, + int backups) { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setReadThrough(true); + ccfg.setWriteThrough(true); + ccfg.setCacheStoreFactory(new TestStoreFactory()); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicityMode(atomicityMode); + ccfg.setCacheMode(cacheMode); + ccfg.setAffinity(new RendezvousAffinityFunction(false, 32)); + ccfg.setAtomicWriteOrderMode(PRIMARY); + + if (cacheMode == PARTITIONED) + ccfg.setBackups(backups); + + return ccfg; + } + + /** + * + */ + public static class TestStoreFactory implements Factory { + /** {@inheritDoc} */ + @Override public CacheStore create() { + return new TestStore(); + } + } + + /** + * + */ + public static class TestStore extends CacheStoreAdapter { + /** */ + static AtomicInteger loadCnt = new AtomicInteger(); + + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure clo, Object... args) { + fail(); + } + + /** {@inheritDoc} */ + @Override public Object load(Object key) { + loadCnt.incrementAndGet(); + + return storeMap.get(key); + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry entry) { + storeMap.put(entry.getKey(), entry.getValue()); + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) { + storeMap.remove(key); + } + } + + /** + * + */ + static class TestEntryProcessor implements EntryProcessor { + /** {@inheritDoc} */ + @Override public Object process(MutableEntry entry, Object... args) { + Object val = entry.getValue(); + + entry.setValue(entry.getKey()); + + return val; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoaderWriterAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoaderWriterAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoaderWriterAbstractTest.java index 28a954b..dd6b268 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoaderWriterAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoaderWriterAbstractTest.java @@ -147,6 +147,16 @@ public abstract class IgniteCacheLoaderWriterAbstractTest extends IgniteCacheAbs assertFalse(storeMap.containsKey(key)); assertNull(cache.get(key)); + + ldrCallCnt.set(0); + + cache.invoke(key, new EntryProcessor() { + @Override public Object process(MutableEntry e, Object... args) { + return null; + } + }); + + checkCalls(1, 0); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java index 5a017e6..883f426 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java @@ -65,8 +65,10 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheConfigurationDefau import org.apache.ignite.internal.processors.cache.IgniteCacheConfigurationTemplateTest; import org.apache.ignite.internal.processors.cache.IgniteCacheDynamicStopSelfTest; import org.apache.ignite.internal.processors.cache.IgniteCacheGetCustomCollectionsSelfTest; +import org.apache.ignite.internal.processors.cache.IgniteCacheInvokeReadThroughSingleNodeTest; import org.apache.ignite.internal.processors.cache.IgniteCacheInvokeReadThroughTest; import org.apache.ignite.internal.processors.cache.IgniteCacheLoadRebalanceEvictionSelfTest; +import org.apache.ignite.internal.processors.cache.IgniteCacheReadThroughStoreCallTest; import org.apache.ignite.internal.processors.cache.IgniteCacheTxCopyOnReadDisabledTest; import org.apache.ignite.internal.processors.cache.IgniteCacheTxLocalPeekModesTest; import org.apache.ignite.internal.processors.cache.IgniteCacheTxLocalStoreValueTest; @@ -203,7 +205,9 @@ public class IgniteCacheTestSuite4 extends TestSuite { suite.addTestSuite(IgniteCacheTxLocalPeekModesTest.class); suite.addTestSuite(IgniteCacheTxReplicatedPeekModesTest.class); + suite.addTestSuite(IgniteCacheInvokeReadThroughSingleNodeTest.class); suite.addTestSuite(IgniteCacheInvokeReadThroughTest.class); + suite.addTestSuite(IgniteCacheReadThroughStoreCallTest.class); suite.addTestSuite(GridCacheVersionMultinodeTest.class); suite.addTestSuite(IgniteCacheNearReadCommittedTest.class);