Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5209F17D98 for ; Thu, 2 Apr 2015 14:37:53 +0000 (UTC) Received: (qmail 14025 invoked by uid 500); 2 Apr 2015 14:37:26 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 13311 invoked by uid 500); 2 Apr 2015 14:37:26 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 11739 invoked by uid 99); 2 Apr 2015 14:31:24 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Apr 2015 14:31:24 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 02 Apr 2015 14:30:55 +0000 Received: (qmail 52493 invoked by uid 99); 2 Apr 2015 14:29:07 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Apr 2015 14:29:07 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5BB0EE2F41; Thu, 2 Apr 2015 14:29:07 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.incubator.apache.org Date: Thu, 02 Apr 2015 14:29:07 -0000 Message-Id: <8831a411bc82418ca1da442bf2a70ced@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/4] incubator-ignite git commit: # GG-9973: Fixed. X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-ignite Updated Branches: refs/heads/ignite-sprint-3 07ca9facd -> c66290121 http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreSelfTest.java deleted file mode 100644 index a3b1072..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreSelfTest.java +++ /dev/null @@ -1,267 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jsr166.*; - -import java.util.*; -import java.util.concurrent.atomic.*; - -/** - * This class provides basic tests for {@link GridCacheWriteBehindStore}. - */ -public class GridCacheWriteBehindStoreSelfTest extends GridCacheWriteBehindStoreAbstractSelfTest { - /** - * Tests correct store shutdown when underlying store fails, - * - * @throws Exception If failed. - */ - public void testShutdownWithFailure() throws Exception { - final AtomicReference err = new AtomicReference<>(); - - multithreadedAsync(new Runnable() { - @Override public void run() { - try { - delegate.setShouldFail(true); - - initStore(2); - - try { - store.write(new CacheEntryImpl<>(1, "val1")); - store.write(new CacheEntryImpl<>(2, "val2")); - } - finally { - shutdownStore(); - - delegate.setShouldFail(false); - } - } - catch (Exception e) { - err.set(e); - } - } - }, 1).get(); - - if (err.get() != null) - throw err.get(); - } - - /** - * @throws Exception If failed. - */ - public void testSimpleStore() throws Exception { - initStore(2); - - try { - store.write(new CacheEntryImpl<>(1, "v1")); - store.write(new CacheEntryImpl<>(2, "v2")); - - assertEquals("v1", store.load(1)); - assertEquals("v2", store.load(2)); - assertNull(store.load(3)); - - store.delete(1); - - assertNull(store.load(1)); - assertEquals("v2", store.load(2)); - assertNull(store.load(3)); - } - finally { - shutdownStore(); - } - } - - /** - * Check that all values written to the store will be in underlying store after timeout or due to size limits. - * - * @throws Exception If failed. - */ - @SuppressWarnings({"NullableProblems"}) - public void testValuePropagation() throws Exception { - // Need to test size-based write. - initStore(1); - - try { - for (int i = 0; i < CACHE_SIZE * 2; i++) - store.write(new CacheEntryImpl<>(i, "val" + i)); - - U.sleep(200); - - for (int i = 0; i < CACHE_SIZE; i++) { - String val = delegate.load(i); - - assertNotNull("Value for [key= " + i + "] was not written in store", val); - assertEquals("Invalid value [key=" + i + "]", "val" + i, val); - } - - U.sleep(FLUSH_FREQUENCY + 300); - - for (int i = CACHE_SIZE; i < CACHE_SIZE * 2; i++) { - String val = delegate.load(i); - - assertNotNull("Value for [key= " + i + "] was not written in store", val); - assertEquals("Invalid value [key=" + i + "]", "val" + i, val); - } - } - finally { - shutdownStore(); - } - } - - /** - * Tests store behaviour under continuous put of the same key with different values. - * - * @throws Exception If failed - */ - public void testContinuousPut() throws Exception { - initStore(2); - - try { - final AtomicBoolean running = new AtomicBoolean(true); - - final AtomicInteger actualPutCnt = new AtomicInteger(); - - IgniteInternalFuture fut = multithreadedAsync(new Runnable() { - @SuppressWarnings({"NullableProblems"}) - @Override public void run() { - try { - while (running.get()) { - for (int i = 0; i < CACHE_SIZE; i++) { - store.write(new CacheEntryImpl<>(i, "val-0")); - - actualPutCnt.incrementAndGet(); - - store.write(new CacheEntryImpl<>(i, "val" + i)); - - actualPutCnt.incrementAndGet(); - } - } - } - catch (Exception e) { - error("Unexpected exception in put thread", e); - - assert false; - } - } - }, 1, "put"); - - U.sleep(FLUSH_FREQUENCY * 2 + 500); - - int delegatePutCnt = delegate.getPutAllCount(); - - running.set(false); - - fut.get(); - - log().info(">>> [putCnt = " + actualPutCnt.get() + ", delegatePutCnt=" + delegatePutCnt + "]"); - - assertTrue("No puts were made to the underlying store", delegatePutCnt > 0); - assertTrue("Too many puts were made to the underlying store", delegatePutCnt < actualPutCnt.get() / 10); - } - finally { - shutdownStore(); - } - - // These checks must be done after the store shut down - assertEquals("Invalid store size", CACHE_SIZE, delegate.getMap().size()); - - for (int i = 0; i < CACHE_SIZE; i++) - assertEquals("Invalid value stored", "val" + i, delegate.getMap().get(i)); - } - - /** - * Tests that all values were put into the store will be written to the underlying store - * after shutdown is called. - * - * @throws Exception If failed. - */ - public void testShutdown() throws Exception { - initStore(2); - - try { - final AtomicBoolean running = new AtomicBoolean(true); - - IgniteInternalFuture fut = multithreadedAsync(new Runnable() { - @SuppressWarnings({"NullableProblems"}) - @Override public void run() { - try { - while (running.get()) { - for (int i = 0; i < CACHE_SIZE; i++) { - store.write(new CacheEntryImpl<>(i, "val-0")); - - store.write(new CacheEntryImpl<>(i, "val" + i)); - } - } - } - catch (Exception e) { - error("Unexpected exception in put thread", e); - - assert false; - } - } - }, 1, "put"); - - U.sleep(300); - - running.set(false); - - fut.get(); - } - finally { - shutdownStore(); - } - - // These checks must be done after the store shut down - assertEquals("Invalid store size", CACHE_SIZE, delegate.getMap().size()); - - for (int i = 0; i < CACHE_SIZE; i++) - assertEquals("Invalid value stored", "val" + i, delegate.getMap().get(i)); - } - - /** - * Tests that all values will be written to the underlying store - * right in the same order as they were put into the store. - * - * @throws Exception If failed. - */ - public void testBatchApply() throws Exception { - delegate = new GridCacheTestStore(new ConcurrentLinkedHashMap()); - - initStore(1); - - List intList = new ArrayList<>(CACHE_SIZE); - - try { - for (int i = 0; i < CACHE_SIZE; i++) { - store.write(new CacheEntryImpl<>(i, "val" + i)); - - intList.add(i); - } - } - finally { - shutdownStore(); - } - - Map underlyingMap = delegate.getMap(); - - assertTrue("Store map key set: " + underlyingMap.keySet(), F.eqOrdered(underlyingMap.keySet(), intList)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractSelfTest.java new file mode 100644 index 0000000..c1dd081 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractSelfTest.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.store; + +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.store.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * Harness for {@link GridCacheWriteBehindStore} tests. + */ +public abstract class GridCacheWriteBehindStoreAbstractSelfTest extends GridCommonAbstractTest { + /** Write cache size. */ + public static final int CACHE_SIZE = 1024; + + /** Value dump interval. */ + public static final int FLUSH_FREQUENCY = 1000; + + /** Underlying store. */ + protected GridCacheTestStore delegate = new GridCacheTestStore(); + + /** Tested store. */ + protected GridCacheWriteBehindStore store; + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + delegate = null; + store = null; + + super.afterTestsStopped(); + } + + /** + * Initializes store. + * + * @param flushThreadCnt Count of flush threads + * @throws Exception If failed. + */ + protected void initStore(int flushThreadCnt) throws Exception { + store = new GridCacheWriteBehindStore<>(null, "", "", log, delegate); + + store.setFlushFrequency(FLUSH_FREQUENCY); + + store.setFlushSize(CACHE_SIZE); + + store.setFlushThreadCount(flushThreadCnt); + + delegate.reset(); + + store.start(); + } + + /** + * Shutdowns store. + * + * @throws Exception If failed. + */ + protected void shutdownStore() throws Exception { + store.stop(); + + assertTrue("Store cache must be empty after shutdown", store.writeCache().isEmpty()); + } + + /** + * Performs multiple put, get and remove operations in several threads on a store. After + * all threads finished their operations, returns the total set of keys that should be + * in underlying store. + * + * @param threadCnt Count of threads that should update keys. + * @param keysPerThread Count of unique keys assigned to a thread. + * @return Set of keys that was totally put in store. + * @throws Exception If failed. + */ + protected Set runPutGetRemoveMultithreaded(int threadCnt, final int keysPerThread) throws Exception { + final ConcurrentMap> perThread = new ConcurrentHashMap<>(); + + final AtomicBoolean running = new AtomicBoolean(true); + + final AtomicInteger cntr = new AtomicInteger(); + + final AtomicInteger operations = new AtomicInteger(); + + IgniteInternalFuture fut = multithreadedAsync(new Runnable() { + @SuppressWarnings({"NullableProblems"}) + @Override public void run() { + // Initialize key set for this thread. + Set set = new HashSet<>(); + + Set old = perThread.putIfAbsent(Thread.currentThread().getName(), set); + + if (old != null) + set = old; + + List original = new ArrayList<>(); + + Random rnd = new Random(); + + for (int i = 0; i < keysPerThread; i++) + original.add(cntr.getAndIncrement()); + + try { + while (running.get()) { + int op = rnd.nextInt(3); + int idx = rnd.nextInt(keysPerThread); + + int key = original.get(idx); + + switch (op) { + case 0: + store.write(new CacheEntryImpl<>(key, "val" + key)); + set.add(key); + + operations.incrementAndGet(); + + break; + + case 1: + store.delete(key); + set.remove(key); + + operations.incrementAndGet(); + + break; + + case 2: + default: + store.write(new CacheEntryImpl<>(key, "broken")); + + String val = store.load(key); + + assertEquals("Invalid intermediate value: " + val, "broken", val); + + store.write(new CacheEntryImpl<>(key, "val" + key)); + + set.add(key); + + // 2 put operations performed here. + operations.incrementAndGet(); + operations.incrementAndGet(); + operations.incrementAndGet(); + + break; + } + } + } + catch (Exception e) { + error("Unexpected exception in put thread", e); + + assert false; + } + } + }, threadCnt, "put"); + + U.sleep(10000); + + running.set(false); + + fut.get(); + + log().info(">>> " + operations + " operations performed totally"); + + Set total = new HashSet<>(); + + for (Set threadVals : perThread.values()) { + total.addAll(threadVals); + } + + return total; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java new file mode 100644 index 0000000..d4d6f02 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.store; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; +import org.apache.ignite.transactions.*; +import org.jetbrains.annotations.*; + +import javax.cache.configuration.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.transactions.TransactionConcurrency.*; +import static org.apache.ignite.transactions.TransactionIsolation.*; + +/** + * Basic store test. + */ +public abstract class GridCacheWriteBehindStoreAbstractTest extends GridCommonAbstractTest { + /** Flush frequency. */ + private static final int WRITE_FROM_BEHIND_FLUSH_FREQUENCY = 1000; + + /** Cache store. */ + private static final GridCacheTestStore store = new GridCacheTestStore(); + + /** + * + */ + protected GridCacheWriteBehindStoreAbstractTest() { + super(true /*start grid. */); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + store.resetTimestamp(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + IgniteCache cache = jcache(); + + if (cache != null) + cache.clear(); + + store.reset(); + } + + /** @return Caching mode. */ + protected abstract CacheMode cacheMode(); + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected final IgniteConfiguration getConfiguration() throws Exception { + IgniteConfiguration c = super.getConfiguration(); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(new TcpDiscoveryVmIpFinder(true)); + + c.setDiscoverySpi(disco); + + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(cacheMode()); + cc.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cc.setSwapEnabled(false); + cc.setAtomicityMode(TRANSACTIONAL); + + cc.setCacheStoreFactory(singletonFactory(store)); + cc.setReadThrough(true); + cc.setWriteThrough(true); + cc.setLoadPreviousValue(true); + + cc.setWriteBehindEnabled(true); + cc.setWriteBehindFlushFrequency(WRITE_FROM_BEHIND_FLUSH_FREQUENCY); + + c.setCacheConfiguration(cc); + + return c; + } + + /** @throws Exception If test fails. */ + public void testWriteThrough() throws Exception { + IgniteCache cache = jcache(); + + Map map = store.getMap(); + + assert map.isEmpty(); + + Transaction tx = grid().transactions().txStart(OPTIMISTIC, REPEATABLE_READ); + + try { + for (int i = 1; i <= 10; i++) { + cache.put(i, Integer.toString(i)); + + checkLastMethod(null); + } + + tx.commit(); + } + finally { + tx.close(); + } + + // Need to wait WFB flush timeout. + U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100); + + checkLastMethod("putAll"); + + assert cache.size() == 10; + + for (int i = 1; i <= 10; i++) { + String val = map.get(i); + + assert val != null; + assert val.equals(Integer.toString(i)); + } + + store.resetLastMethod(); + + tx = grid().transactions().txStart(); + + try { + for (int i = 1; i <= 10; i++) { + String val = cache.getAndRemove(i); + + checkLastMethod(null); + + assert val != null; + assert val.equals(Integer.toString(i)); + } + + tx.commit(); + } + finally { + tx.close(); + } + + // Need to wait WFB flush timeout. + U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100); + + checkLastMethod("removeAll"); + + assert map.isEmpty(); + } + + /** @throws Exception If test failed. */ + public void testReadThrough() throws Exception { + IgniteCache cache = jcache(); + + Map map = store.getMap(); + + assert map.isEmpty(); + + try (Transaction tx = grid().transactions().txStart(OPTIMISTIC, REPEATABLE_READ)) { + for (int i = 1; i <= 10; i++) + cache.put(i, Integer.toString(i)); + + checkLastMethod(null); + + tx.commit(); + } + + // Need to wait WFB flush timeout. + U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100); + + checkLastMethod("putAll"); + + for (int i = 1; i <= 10; i++) { + String val = map.get(i); + + assert val != null; + assert val.equals(Integer.toString(i)); + } + + cache.clear(); + + assert cache.localSize() == 0; + assert cache.localSize() == 0; + + // Need to wait WFB flush timeout. + U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100); + + assert map.size() == 10; + + for (int i = 1; i <= 10; i++) { + // Read through. + String val = cache.get(i); + + checkLastMethod("load"); + + assert val != null; + assert val.equals(Integer.toString(i)); + } + + assert cache.size() == 10; + + cache.clear(); + + assert cache.localSize() == 0; + assert cache.localSize() == 0; + + assert map.size() == 10; + + Set keys = new HashSet<>(); + + for (int i = 1; i <= 10; i++) + keys.add(i); + + // Read through. + Map vals = cache.getAll(keys); + + checkLastMethod("loadAll"); + + assert vals != null; + assert vals.size() == 10; + + for (int i = 1; i <= 10; i++) { + String val = vals.get(i); + + assert val != null; + assert val.equals(Integer.toString(i)); + } + + // Write through. + cache.removeAll(keys); + + // Need to wait WFB flush timeout. + U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100); + + checkLastMethod("removeAll"); + + assert cache.localSize() == 0; + assert cache.localSize() == 0; + + assert map.isEmpty(); + } + + /** @throws Exception If failed. */ + public void testMultithreaded() throws Exception { + final ConcurrentMap> perThread = new ConcurrentHashMap<>(); + + final AtomicBoolean running = new AtomicBoolean(true); + + final IgniteCache cache = jcache(); + + IgniteInternalFuture fut = multithreadedAsync(new Runnable() { + @SuppressWarnings({"NullableProblems"}) + @Override public void run() { + // Initialize key set for this thread. + Set set = new HashSet<>(); + + Set old = perThread.putIfAbsent(Thread.currentThread().getName(), set); + + if (old != null) + set = old; + + Random rnd = new Random(); + + int keyCnt = 20000; + + while (running.get()) { + int op = rnd.nextInt(2); + int key = rnd.nextInt(keyCnt); + + switch (op) { + case 0: + cache.put(key, "val" + key); + set.add(key); + + break; + + case 1: + default: + cache.remove(key); + set.remove(key); + + break; + } + } + } + }, 10, "put"); + + U.sleep(10000); + + running.set(false); + + fut.get(); + + U.sleep(5 * WRITE_FROM_BEHIND_FLUSH_FREQUENCY); + + Map stored = store.getMap(); + + for (Map.Entry entry : stored.entrySet()) { + int key = entry.getKey(); + + assertEquals("Invalid value for key " + key, "val" + key, entry.getValue()); + + boolean found = false; + + for (Set threadPuts : perThread.values()) { + if (threadPuts.contains(key)) { + found = true; + + break; + } + } + + assert found : "No threads found that put key " + key; + } + } + + /** @param mtd Expected last method value. */ + private void checkLastMethod(@Nullable String mtd) { + String lastMtd = store.getLastMethod(); + + if (mtd == null) + assert lastMtd == null : "Last method must be null: " + lastMtd; + else { + assert lastMtd != null : "Last method must be not null"; + assert lastMtd.equals(mtd) : "Last method does not match [expected=" + mtd + ", lastMtd=" + lastMtd + ']'; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreLocalTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreLocalTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreLocalTest.java new file mode 100644 index 0000000..2325fa6 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreLocalTest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.store; + +import org.apache.ignite.cache.*; + +/** + * Tests {@link org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore} in grid configuration. + */ +public class GridCacheWriteBehindStoreLocalTest extends GridCacheWriteBehindStoreAbstractTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.LOCAL; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreMultithreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreMultithreadedSelfTest.java new file mode 100644 index 0000000..3bcebb0 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreMultithreadedSelfTest.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.store; + +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.util.*; + +/** + * Multithreaded tests for {@link org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore}. + */ +public class GridCacheWriteBehindStoreMultithreadedSelfTest extends GridCacheWriteBehindStoreAbstractSelfTest { + /** + * This test performs complex set of operations on store from multiple threads. + * + * @throws Exception If failed. + */ + public void testPutGetRemove() throws Exception { + initStore(2); + + Set exp; + + try { + exp = runPutGetRemoveMultithreaded(10, 10); + } + finally { + shutdownStore(); + } + + Map map = delegate.getMap(); + + Collection extra = new HashSet<>(map.keySet()); + + extra.removeAll(exp); + + assertTrue("The underlying store contains extra keys: " + extra, extra.isEmpty()); + + Collection missing = new HashSet<>(exp); + + missing.removeAll(map.keySet()); + + assertTrue("Missing keys in the underlying store: " + missing, missing.isEmpty()); + + for (Integer key : exp) + assertEquals("Invalid value for key " + key, "val" + key, map.get(key)); + } + + /** + * Tests that cache would keep values if underlying store fails. + * + * @throws Exception If failed. + */ + public void testStoreFailure() throws Exception { + delegate.setShouldFail(true); + + initStore(2); + + Set exp; + + try { + exp = runPutGetRemoveMultithreaded(10, 10); + + U.sleep(FLUSH_FREQUENCY); + + info(">>> There are " + store.getWriteBehindErrorRetryCount() + " entries in RETRY state"); + + delegate.setShouldFail(false); + + // Despite that we set shouldFail flag to false, flush thread may just have caught an exception. + // If we move store to the stopping state right away, this value will be lost. That's why this sleep + // is inserted here to let all exception handlers in write-behind store exit. + U.sleep(1000); + } + finally { + shutdownStore(); + } + + Map map = delegate.getMap(); + + Collection extra = new HashSet<>(map.keySet()); + + extra.removeAll(exp); + + assertTrue("The underlying store contains extra keys: " + extra, extra.isEmpty()); + + Collection missing = new HashSet<>(exp); + + missing.removeAll(map.keySet()); + + assertTrue("Missing keys in the underlying store: " + missing, missing.isEmpty()); + + for (Integer key : exp) + assertEquals("Invalid value for key " + key, "val" + key, map.get(key)); + } + + /** + * Tests store consistency in case of high put rate, when flush is performed from the same thread + * as put or remove operation. + * + * @throws Exception If failed. + */ + public void testFlushFromTheSameThread() throws Exception { + // 50 milliseconds should be enough. + delegate.setOperationDelay(50); + + initStore(2); + + Set exp; + + int start = store.getWriteBehindTotalCriticalOverflowCount(); + + try { + //We will have in total 5 * CACHE_SIZE keys that should be enough to grow map size to critical value. + exp = runPutGetRemoveMultithreaded(5, CACHE_SIZE); + } + finally { + log.info(">>> Done inserting, shutting down the store"); + + shutdownStore(); + } + + // Restore delay. + delegate.setOperationDelay(0); + + Map map = delegate.getMap(); + + int end = store.getWriteBehindTotalCriticalOverflowCount(); + + log.info(">>> There are " + exp.size() + " keys in store, " + (end - start) + " overflows detected"); + + assertTrue("No cache overflows detected (a bug or too few keys or too few delay?)", end > start); + + Collection extra = new HashSet<>(map.keySet()); + + extra.removeAll(exp); + + assertTrue("The underlying store contains extra keys: " + extra, extra.isEmpty()); + + Collection missing = new HashSet<>(exp); + + missing.removeAll(map.keySet()); + + assertTrue("Missing keys in the underlying store: " + missing, missing.isEmpty()); + + for (Integer key : exp) + assertEquals("Invalid value for key " + key, "val" + key, map.get(key)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java new file mode 100644 index 0000000..e9821fb --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.store; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; +import org.apache.ignite.transactions.*; + +import java.util.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.transactions.TransactionConcurrency.*; +import static org.apache.ignite.transactions.TransactionIsolation.*; + +/** + * Tests write-behind store with near and dht commit option. + */ +public class GridCacheWriteBehindStorePartitionedMultiNodeSelfTest extends GridCommonAbstractTest { + /** Grids to start. */ + private static final int GRID_CNT = 5; + + /** Ip finder. */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Flush frequency. */ + public static final int WRITE_BEHIND_FLUSH_FREQ = 1000; + + /** Stores per grid. */ + private GridCacheTestStore[] stores = new GridCacheTestStore[GRID_CNT]; + + /** Start grid counter. */ + private int idx; + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + c.setDiscoverySpi(disco); + + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(CacheMode.PARTITIONED); + cc.setWriteBehindEnabled(true); + cc.setWriteBehindFlushFrequency(WRITE_BEHIND_FLUSH_FREQ); + cc.setAtomicityMode(TRANSACTIONAL); + cc.setNearConfiguration(new NearCacheConfiguration()); + + CacheStore store = stores[idx] = new GridCacheTestStore(); + + cc.setCacheStoreFactory(singletonFactory(store)); + cc.setReadThrough(true); + cc.setWriteThrough(true); + cc.setLoadPreviousValue(true); + + c.setCacheConfiguration(cc); + + idx++; + + return c; + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stores = null; + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + private void prepare() throws Exception { + idx = 0; + + startGrids(GRID_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testSingleWritesOnDhtNode() throws Exception { + checkSingleWrites(); + } + + /** + * @throws Exception If failed. + */ + public void testBatchWritesOnDhtNode() throws Exception { + checkBatchWrites(); + } + + /** + * @throws Exception If failed. + */ + public void testTxWritesOnDhtNode() throws Exception { + checkTxWrites(); + } + + /** + * @throws Exception If failed. + */ + private void checkSingleWrites() throws Exception { + prepare(); + + IgniteCache cache = grid(0).cache(null); + + for (int i = 0; i < 100; i++) + cache.put(i, String.valueOf(i)); + + checkWrites(); + } + + /** + * @throws Exception If failed. + */ + private void checkBatchWrites() throws Exception { + prepare(); + + Map map = new HashMap<>(); + + for (int i = 0; i < 100; i++) + map.put(i, String.valueOf(i)); + + grid(0).cache(null).putAll(map); + + checkWrites(); + } + + /** + * @throws Exception If failed. + */ + private void checkTxWrites() throws Exception { + prepare(); + + IgniteCache cache = grid(0).cache(null); + + try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + for (int i = 0; i < 100; i++) + cache.put(i, String.valueOf(i)); + + tx.commit(); + } + + checkWrites(); + } + + /** + * @throws IgniteInterruptedCheckedException If sleep was interrupted. + */ + private void checkWrites() throws IgniteInterruptedCheckedException { + U.sleep(WRITE_BEHIND_FLUSH_FREQ * 2); + + Collection allKeys = new ArrayList<>(100); + + for (int i = 0; i < GRID_CNT; i++) { + Map map = stores[i].getMap(); + + assertFalse("Missing writes for node: " + i, map.isEmpty()); + + allKeys.addAll(map.keySet()); + + // Check there is no intersection. + for (int j = 0; j < GRID_CNT; j++) { + if (i == j) + continue; + + Collection intersection = new HashSet<>(stores[j].getMap().keySet()); + + intersection.retainAll(map.keySet()); + + assertTrue(intersection.isEmpty()); + } + } + + assertEquals(100, allKeys.size()); + + for (int i = 0; i < 100; i++) + assertTrue(allKeys.contains(i)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStorePartitionedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStorePartitionedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStorePartitionedTest.java new file mode 100644 index 0000000..fe589ca --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStorePartitionedTest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.store; + +import org.apache.ignite.cache.*; + +/** + * Tests {@link org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore} in partitioned configuration. + */ +public class GridCacheWriteBehindStorePartitionedTest extends GridCacheWriteBehindStoreAbstractTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.PARTITIONED; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreReplicatedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreReplicatedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreReplicatedTest.java new file mode 100644 index 0000000..26f8431 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreReplicatedTest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.store; + +import org.apache.ignite.cache.*; + +/** + * Tests {@link org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore} in grid configuration. + */ +public class GridCacheWriteBehindStoreReplicatedTest extends GridCacheWriteBehindStoreAbstractTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.REPLICATED; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java new file mode 100644 index 0000000..937e597 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.store; + +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jsr166.*; + +import java.util.*; +import java.util.concurrent.atomic.*; + +/** + * This class provides basic tests for {@link org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore}. + */ +public class GridCacheWriteBehindStoreSelfTest extends GridCacheWriteBehindStoreAbstractSelfTest { + /** + * Tests correct store shutdown when underlying store fails, + * + * @throws Exception If failed. + */ + public void testShutdownWithFailure() throws Exception { + final AtomicReference err = new AtomicReference<>(); + + multithreadedAsync(new Runnable() { + @Override public void run() { + try { + delegate.setShouldFail(true); + + initStore(2); + + try { + store.write(new CacheEntryImpl<>(1, "val1")); + store.write(new CacheEntryImpl<>(2, "val2")); + } + finally { + shutdownStore(); + + delegate.setShouldFail(false); + } + } + catch (Exception e) { + err.set(e); + } + } + }, 1).get(); + + if (err.get() != null) + throw err.get(); + } + + /** + * @throws Exception If failed. + */ + public void testSimpleStore() throws Exception { + initStore(2); + + try { + store.write(new CacheEntryImpl<>(1, "v1")); + store.write(new CacheEntryImpl<>(2, "v2")); + + assertEquals("v1", store.load(1)); + assertEquals("v2", store.load(2)); + assertNull(store.load(3)); + + store.delete(1); + + assertNull(store.load(1)); + assertEquals("v2", store.load(2)); + assertNull(store.load(3)); + } + finally { + shutdownStore(); + } + } + + /** + * Check that all values written to the store will be in underlying store after timeout or due to size limits. + * + * @throws Exception If failed. + */ + @SuppressWarnings({"NullableProblems"}) + public void testValuePropagation() throws Exception { + // Need to test size-based write. + initStore(1); + + try { + for (int i = 0; i < CACHE_SIZE * 2; i++) + store.write(new CacheEntryImpl<>(i, "val" + i)); + + U.sleep(200); + + for (int i = 0; i < CACHE_SIZE; i++) { + String val = delegate.load(i); + + assertNotNull("Value for [key= " + i + "] was not written in store", val); + assertEquals("Invalid value [key=" + i + "]", "val" + i, val); + } + + U.sleep(FLUSH_FREQUENCY + 300); + + for (int i = CACHE_SIZE; i < CACHE_SIZE * 2; i++) { + String val = delegate.load(i); + + assertNotNull("Value for [key= " + i + "] was not written in store", val); + assertEquals("Invalid value [key=" + i + "]", "val" + i, val); + } + } + finally { + shutdownStore(); + } + } + + /** + * Tests store behaviour under continuous put of the same key with different values. + * + * @throws Exception If failed + */ + public void testContinuousPut() throws Exception { + initStore(2); + + try { + final AtomicBoolean running = new AtomicBoolean(true); + + final AtomicInteger actualPutCnt = new AtomicInteger(); + + IgniteInternalFuture fut = multithreadedAsync(new Runnable() { + @SuppressWarnings({"NullableProblems"}) + @Override public void run() { + try { + while (running.get()) { + for (int i = 0; i < CACHE_SIZE; i++) { + store.write(new CacheEntryImpl<>(i, "val-0")); + + actualPutCnt.incrementAndGet(); + + store.write(new CacheEntryImpl<>(i, "val" + i)); + + actualPutCnt.incrementAndGet(); + } + } + } + catch (Exception e) { + error("Unexpected exception in put thread", e); + + assert false; + } + } + }, 1, "put"); + + U.sleep(FLUSH_FREQUENCY * 2 + 500); + + int delegatePutCnt = delegate.getPutAllCount(); + + running.set(false); + + fut.get(); + + log().info(">>> [putCnt = " + actualPutCnt.get() + ", delegatePutCnt=" + delegatePutCnt + "]"); + + assertTrue("No puts were made to the underlying store", delegatePutCnt > 0); + assertTrue("Too many puts were made to the underlying store", delegatePutCnt < actualPutCnt.get() / 10); + } + finally { + shutdownStore(); + } + + // These checks must be done after the store shut down + assertEquals("Invalid store size", CACHE_SIZE, delegate.getMap().size()); + + for (int i = 0; i < CACHE_SIZE; i++) + assertEquals("Invalid value stored", "val" + i, delegate.getMap().get(i)); + } + + /** + * Tests that all values were put into the store will be written to the underlying store + * after shutdown is called. + * + * @throws Exception If failed. + */ + public void testShutdown() throws Exception { + initStore(2); + + try { + final AtomicBoolean running = new AtomicBoolean(true); + + IgniteInternalFuture fut = multithreadedAsync(new Runnable() { + @SuppressWarnings({"NullableProblems"}) + @Override public void run() { + try { + while (running.get()) { + for (int i = 0; i < CACHE_SIZE; i++) { + store.write(new CacheEntryImpl<>(i, "val-0")); + + store.write(new CacheEntryImpl<>(i, "val" + i)); + } + } + } + catch (Exception e) { + error("Unexpected exception in put thread", e); + + assert false; + } + } + }, 1, "put"); + + U.sleep(300); + + running.set(false); + + fut.get(); + } + finally { + shutdownStore(); + } + + // These checks must be done after the store shut down + assertEquals("Invalid store size", CACHE_SIZE, delegate.getMap().size()); + + for (int i = 0; i < CACHE_SIZE; i++) + assertEquals("Invalid value stored", "val" + i, delegate.getMap().get(i)); + } + + /** + * Tests that all values will be written to the underlying store + * right in the same order as they were put into the store. + * + * @throws Exception If failed. + */ + public void testBatchApply() throws Exception { + delegate = new GridCacheTestStore(new ConcurrentLinkedHashMap()); + + initStore(1); + + List intList = new ArrayList<>(CACHE_SIZE); + + try { + for (int i = 0; i < CACHE_SIZE; i++) { + store.write(new CacheEntryImpl<>(i, "val" + i)); + + intList.add(i); + } + } + finally { + shutdownStore(); + } + + Map underlyingMap = delegate.getMap(); + + assertTrue("Store map key set: " + underlyingMap.keySet(), F.eqOrdered(underlyingMap.keySet(), intList)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java index 20a9caf..b277d48 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.processors.cache.dr.*; import org.apache.ignite.internal.processors.cache.jta.*; import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.processors.cache.query.continuous.*; +import org.apache.ignite.internal.processors.cache.store.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.processors.plugin.*; @@ -59,10 +60,7 @@ public class GridCacheTestContext extends GridCacheContext { true, new GridCacheEventManager(), new GridCacheSwapManager(false), - new GridCacheStoreManager(null, - new IdentityHashMap(), - null, - new CacheConfiguration()), + new CacheOsStoreManager(null, new CacheConfiguration()), new GridCacheEvictionManager(), new GridCacheLocalQueryManager(), new CacheContinuousQueryManager(), @@ -74,5 +72,7 @@ public class GridCacheTestContext extends GridCacheContext { new CacheOsConflictResolutionManager(), new CachePluginManager(ctx, new CacheConfiguration()) ); + + store().initialize(null, new IdentityHashMap()); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheWriteBehindTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheWriteBehindTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheWriteBehindTestSuite.java index 452dbf1..529b227 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheWriteBehindTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheWriteBehindTestSuite.java @@ -19,9 +19,10 @@ package org.apache.ignite.testsuites; import junit.framework.*; import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.store.*; /** - * Test suite that contains all tests for {@link GridCacheWriteBehindStore}. + * Test suite that contains all tests for {@link org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore}. */ public class IgniteCacheWriteBehindTestSuite extends TestSuite { /**