ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [23/30] incubator-ignite git commit: # GG-9973: Fixed.
Date Thu, 02 Apr 2015 19:23:06 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreSelfTest.java
deleted file mode 100644
index a3b1072..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreSelfTest.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jsr166.*;
-
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-/**
- * This class provides basic tests for {@link GridCacheWriteBehindStore}.
- */
-public class GridCacheWriteBehindStoreSelfTest extends GridCacheWriteBehindStoreAbstractSelfTest {
-    /**
-     * Tests correct store shutdown when underlying store fails,
-     *
-     * @throws Exception If failed.
-     */
-    public void testShutdownWithFailure() throws Exception {
-        final AtomicReference<Exception> err = new AtomicReference<>();
-
-        multithreadedAsync(new Runnable() {
-            @Override public void run() {
-                try {
-                    delegate.setShouldFail(true);
-
-                    initStore(2);
-
-                    try {
-                        store.write(new CacheEntryImpl<>(1, "val1"));
-                        store.write(new CacheEntryImpl<>(2, "val2"));
-                    }
-                    finally {
-                        shutdownStore();
-
-                        delegate.setShouldFail(false);
-                    }
-                }
-                catch (Exception e) {
-                    err.set(e);
-                }
-            }
-        }, 1).get();
-
-        if (err.get() != null)
-            throw err.get();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testSimpleStore() throws Exception {
-        initStore(2);
-
-        try {
-            store.write(new CacheEntryImpl<>(1, "v1"));
-            store.write(new CacheEntryImpl<>(2, "v2"));
-
-            assertEquals("v1", store.load(1));
-            assertEquals("v2", store.load(2));
-            assertNull(store.load(3));
-
-            store.delete(1);
-
-            assertNull(store.load(1));
-            assertEquals("v2", store.load(2));
-            assertNull(store.load(3));
-        }
-        finally {
-            shutdownStore();
-        }
-    }
-
-    /**
-     * Check that all values written to the store will be in underlying store after timeout or due to size limits.
-     *
-     * @throws Exception If failed.
-     */
-    @SuppressWarnings({"NullableProblems"})
-    public void testValuePropagation() throws Exception {
-        // Need to test size-based write.
-        initStore(1);
-
-        try {
-            for (int i = 0; i < CACHE_SIZE * 2; i++)
-                store.write(new CacheEntryImpl<>(i, "val" + i));
-
-            U.sleep(200);
-
-            for (int i = 0; i < CACHE_SIZE; i++) {
-                String val = delegate.load(i);
-
-                assertNotNull("Value for [key= " + i + "] was not written in store", val);
-                assertEquals("Invalid value [key=" + i + "]", "val" + i, val);
-            }
-
-            U.sleep(FLUSH_FREQUENCY + 300);
-
-            for (int i = CACHE_SIZE; i < CACHE_SIZE * 2; i++) {
-                String val = delegate.load(i);
-
-                assertNotNull("Value for [key= " + i + "] was not written in store", val);
-                assertEquals("Invalid value [key=" + i + "]", "val" + i, val);
-            }
-        }
-        finally {
-            shutdownStore();
-        }
-    }
-
-    /**
-     * Tests store behaviour under continuous put of the same key with different values.
-     *
-     * @throws Exception If failed
-     */
-    public void testContinuousPut() throws Exception {
-        initStore(2);
-
-        try {
-            final AtomicBoolean running = new AtomicBoolean(true);
-
-            final AtomicInteger actualPutCnt = new AtomicInteger();
-
-            IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
-                @SuppressWarnings({"NullableProblems"})
-                @Override public void run() {
-                    try {
-                        while (running.get()) {
-                            for (int i = 0; i < CACHE_SIZE; i++) {
-                                store.write(new CacheEntryImpl<>(i, "val-0"));
-
-                                actualPutCnt.incrementAndGet();
-
-                                store.write(new CacheEntryImpl<>(i, "val" + i));
-
-                                actualPutCnt.incrementAndGet();
-                            }
-                        }
-                    }
-                    catch (Exception e) {
-                        error("Unexpected exception in put thread", e);
-
-                        assert false;
-                    }
-                }
-            }, 1, "put");
-
-            U.sleep(FLUSH_FREQUENCY * 2 + 500);
-
-            int delegatePutCnt = delegate.getPutAllCount();
-
-            running.set(false);
-
-            fut.get();
-
-            log().info(">>> [putCnt = " + actualPutCnt.get() + ", delegatePutCnt=" + delegatePutCnt + "]");
-
-            assertTrue("No puts were made to the underlying store", delegatePutCnt > 0);
-            assertTrue("Too many puts were made to the underlying store", delegatePutCnt < actualPutCnt.get() / 10);
-        }
-        finally {
-            shutdownStore();
-        }
-
-        // These checks must be done after the store shut down
-        assertEquals("Invalid store size", CACHE_SIZE, delegate.getMap().size());
-
-        for (int i = 0; i < CACHE_SIZE; i++)
-            assertEquals("Invalid value stored", "val" + i, delegate.getMap().get(i));
-    }
-
-    /**
-     * Tests that all values were put into the store will be written to the underlying store
-     * after shutdown is called.
-     *
-     * @throws Exception If failed.
-     */
-    public void testShutdown() throws Exception {
-        initStore(2);
-
-        try {
-            final AtomicBoolean running = new AtomicBoolean(true);
-
-            IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
-                @SuppressWarnings({"NullableProblems"})
-                @Override public void run() {
-                    try {
-                        while (running.get()) {
-                            for (int i = 0; i < CACHE_SIZE; i++) {
-                                store.write(new CacheEntryImpl<>(i, "val-0"));
-
-                                store.write(new CacheEntryImpl<>(i, "val" + i));
-                            }
-                        }
-                    }
-                    catch (Exception e) {
-                        error("Unexpected exception in put thread", e);
-
-                        assert false;
-                    }
-                }
-            }, 1, "put");
-
-            U.sleep(300);
-
-            running.set(false);
-
-            fut.get();
-        }
-        finally {
-            shutdownStore();
-        }
-
-        // These checks must be done after the store shut down
-        assertEquals("Invalid store size", CACHE_SIZE, delegate.getMap().size());
-
-        for (int i = 0; i < CACHE_SIZE; i++)
-            assertEquals("Invalid value stored", "val" + i, delegate.getMap().get(i));
-    }
-
-    /**
-     * Tests that all values will be written to the underlying store
-     * right in the same order as they were put into the store.
-     *
-     * @throws Exception If failed.
-     */
-    public void testBatchApply() throws Exception {
-        delegate = new GridCacheTestStore(new ConcurrentLinkedHashMap<Integer, String>());
-
-        initStore(1);
-
-        List<Integer> intList = new ArrayList<>(CACHE_SIZE);
-
-        try {
-            for (int i = 0; i < CACHE_SIZE; i++) {
-                store.write(new CacheEntryImpl<>(i, "val" + i));
-
-                intList.add(i);
-            }
-        }
-        finally {
-            shutdownStore();
-        }
-
-        Map<Integer, String> underlyingMap = delegate.getMap();
-
-        assertTrue("Store map key set: " + underlyingMap.keySet(), F.eqOrdered(underlyingMap.keySet(), intList));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractSelfTest.java
new file mode 100644
index 0000000..c1dd081
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractSelfTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.store;
+
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.store.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Harness for {@link GridCacheWriteBehindStore} tests.
+ */
+public abstract class GridCacheWriteBehindStoreAbstractSelfTest extends GridCommonAbstractTest {
+    /** Write cache size. */
+    public static final int CACHE_SIZE = 1024;
+
+    /** Value dump interval. */
+    public static final int FLUSH_FREQUENCY = 1000;
+
+    /** Underlying store. */
+    protected GridCacheTestStore delegate = new GridCacheTestStore();
+
+    /** Tested store. */
+    protected GridCacheWriteBehindStore<Integer, String> store;
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        delegate = null;
+        store = null;
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * Initializes store.
+     *
+     * @param flushThreadCnt Count of flush threads
+     * @throws Exception If failed.
+     */
+    protected void initStore(int flushThreadCnt) throws Exception {
+        store = new GridCacheWriteBehindStore<>(null, "", "", log, delegate);
+
+        store.setFlushFrequency(FLUSH_FREQUENCY);
+
+        store.setFlushSize(CACHE_SIZE);
+
+        store.setFlushThreadCount(flushThreadCnt);
+
+        delegate.reset();
+
+        store.start();
+    }
+
+    /**
+     * Shutdowns store.
+     *
+     * @throws Exception If failed.
+     */
+    protected void shutdownStore() throws Exception {
+        store.stop();
+
+        assertTrue("Store cache must be empty after shutdown", store.writeCache().isEmpty());
+    }
+
+    /**
+     * Performs multiple put, get and remove operations in several threads on a store. After
+     * all threads finished their operations, returns the total set of keys that should be
+     * in underlying store.
+     *
+     * @param threadCnt Count of threads that should update keys.
+     * @param keysPerThread Count of unique keys assigned to a thread.
+     * @return Set of keys that was totally put in store.
+     * @throws Exception If failed.
+     */
+    protected Set<Integer> runPutGetRemoveMultithreaded(int threadCnt, final int keysPerThread) throws Exception {
+        final ConcurrentMap<String, Set<Integer>> perThread = new ConcurrentHashMap<>();
+
+        final AtomicBoolean running = new AtomicBoolean(true);
+
+        final AtomicInteger cntr = new AtomicInteger();
+
+        final AtomicInteger operations = new AtomicInteger();
+
+        IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
+            @SuppressWarnings({"NullableProblems"})
+            @Override public void run() {
+                // Initialize key set for this thread.
+                Set<Integer> set = new HashSet<>();
+
+                Set<Integer> old = perThread.putIfAbsent(Thread.currentThread().getName(), set);
+
+                if (old != null)
+                    set = old;
+
+                List<Integer> original = new ArrayList<>();
+
+                Random rnd = new Random();
+
+                for (int i = 0; i < keysPerThread; i++)
+                    original.add(cntr.getAndIncrement());
+
+                try {
+                    while (running.get()) {
+                        int op = rnd.nextInt(3);
+                        int idx = rnd.nextInt(keysPerThread);
+
+                        int key = original.get(idx);
+
+                        switch (op) {
+                            case 0:
+                                store.write(new CacheEntryImpl<>(key, "val" + key));
+                                set.add(key);
+
+                                operations.incrementAndGet();
+
+                                break;
+
+                            case 1:
+                                store.delete(key);
+                                set.remove(key);
+
+                                operations.incrementAndGet();
+
+                                break;
+
+                            case 2:
+                            default:
+                                store.write(new CacheEntryImpl<>(key, "broken"));
+
+                                String val = store.load(key);
+
+                                assertEquals("Invalid intermediate value: " + val, "broken", val);
+
+                                store.write(new CacheEntryImpl<>(key, "val" + key));
+
+                                set.add(key);
+
+                                // 2 put operations performed here.
+                                operations.incrementAndGet();
+                                operations.incrementAndGet();
+                                operations.incrementAndGet();
+
+                                break;
+                        }
+                    }
+                }
+                catch (Exception e) {
+                    error("Unexpected exception in put thread", e);
+
+                    assert false;
+                }
+            }
+        }, threadCnt, "put");
+
+        U.sleep(10000);
+
+        running.set(false);
+
+        fut.get();
+
+        log().info(">>> " + operations + " operations performed totally");
+
+        Set<Integer> total = new HashSet<>();
+
+        for (Set<Integer> threadVals : perThread.values()) {
+            total.addAll(threadVals);
+        }
+
+        return total;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java
new file mode 100644
index 0000000..d4d6f02
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.store;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.apache.ignite.transactions.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.configuration.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.transactions.TransactionConcurrency.*;
+import static org.apache.ignite.transactions.TransactionIsolation.*;
+
+/**
+ * Basic store test.
+ */
+public abstract class GridCacheWriteBehindStoreAbstractTest extends GridCommonAbstractTest {
+    /** Flush frequency. */
+    private static final int WRITE_FROM_BEHIND_FLUSH_FREQUENCY = 1000;
+
+    /** Cache store. */
+    private static final GridCacheTestStore store = new GridCacheTestStore();
+
+    /**
+     *
+     */
+    protected GridCacheWriteBehindStoreAbstractTest() {
+        super(true /*start grid. */);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        store.resetTimestamp();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        IgniteCache<?, ?> cache = jcache();
+
+        if (cache != null)
+            cache.clear();
+
+        store.reset();
+    }
+
+    /** @return Caching mode. */
+    protected abstract CacheMode cacheMode();
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected final IgniteConfiguration getConfiguration() throws Exception {
+        IgniteConfiguration c = super.getConfiguration();
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(new TcpDiscoveryVmIpFinder(true));
+
+        c.setDiscoverySpi(disco);
+
+        CacheConfiguration cc = defaultCacheConfiguration();
+
+        cc.setCacheMode(cacheMode());
+        cc.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        cc.setSwapEnabled(false);
+        cc.setAtomicityMode(TRANSACTIONAL);
+
+        cc.setCacheStoreFactory(singletonFactory(store));
+        cc.setReadThrough(true);
+        cc.setWriteThrough(true);
+        cc.setLoadPreviousValue(true);
+
+        cc.setWriteBehindEnabled(true);
+        cc.setWriteBehindFlushFrequency(WRITE_FROM_BEHIND_FLUSH_FREQUENCY);
+
+        c.setCacheConfiguration(cc);
+
+        return c;
+    }
+
+    /** @throws Exception If test fails. */
+    public void testWriteThrough() throws Exception {
+        IgniteCache<Integer, String> cache = jcache();
+
+        Map<Integer, String> map = store.getMap();
+
+        assert map.isEmpty();
+
+        Transaction tx = grid().transactions().txStart(OPTIMISTIC, REPEATABLE_READ);
+
+        try {
+            for (int i = 1; i <= 10; i++) {
+                cache.put(i, Integer.toString(i));
+
+                checkLastMethod(null);
+            }
+
+            tx.commit();
+        }
+        finally {
+            tx.close();
+        }
+
+        // Need to wait WFB flush timeout.
+        U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100);
+
+        checkLastMethod("putAll");
+
+        assert cache.size() == 10;
+
+        for (int i = 1; i <= 10; i++) {
+            String val = map.get(i);
+
+            assert val != null;
+            assert val.equals(Integer.toString(i));
+        }
+
+        store.resetLastMethod();
+
+        tx = grid().transactions().txStart();
+
+        try {
+            for (int i = 1; i <= 10; i++) {
+                String val = cache.getAndRemove(i);
+
+                checkLastMethod(null);
+
+                assert val != null;
+                assert val.equals(Integer.toString(i));
+            }
+
+            tx.commit();
+        }
+        finally {
+            tx.close();
+        }
+
+        // Need to wait WFB flush timeout.
+        U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100);
+
+        checkLastMethod("removeAll");
+
+        assert map.isEmpty();
+    }
+
+    /** @throws Exception If test failed. */
+    public void testReadThrough() throws Exception {
+        IgniteCache<Integer, String> cache = jcache();
+
+        Map<Integer, String> map = store.getMap();
+
+        assert map.isEmpty();
+
+        try (Transaction tx = grid().transactions().txStart(OPTIMISTIC, REPEATABLE_READ)) {
+            for (int i = 1; i <= 10; i++)
+                cache.put(i, Integer.toString(i));
+
+            checkLastMethod(null);
+
+            tx.commit();
+        }
+
+        // Need to wait WFB flush timeout.
+        U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100);
+
+        checkLastMethod("putAll");
+
+        for (int i = 1; i <= 10; i++) {
+            String val = map.get(i);
+
+            assert val != null;
+            assert val.equals(Integer.toString(i));
+        }
+
+        cache.clear();
+
+        assert cache.localSize() == 0;
+        assert cache.localSize() == 0;
+
+        // Need to wait WFB flush timeout.
+        U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100);
+
+        assert map.size() == 10;
+
+        for (int i = 1; i <= 10; i++) {
+            // Read through.
+            String val = cache.get(i);
+
+            checkLastMethod("load");
+
+            assert val != null;
+            assert val.equals(Integer.toString(i));
+        }
+
+        assert cache.size() == 10;
+
+        cache.clear();
+
+        assert cache.localSize() == 0;
+        assert cache.localSize() == 0;
+
+        assert map.size() == 10;
+
+        Set<Integer> keys = new HashSet<>();
+
+        for (int i = 1; i <= 10; i++)
+            keys.add(i);
+
+        // Read through.
+        Map<Integer, String> vals = cache.getAll(keys);
+
+        checkLastMethod("loadAll");
+
+        assert vals != null;
+        assert vals.size() == 10;
+
+        for (int i = 1; i <= 10; i++) {
+            String val = vals.get(i);
+
+            assert val != null;
+            assert val.equals(Integer.toString(i));
+        }
+
+        // Write through.
+        cache.removeAll(keys);
+
+        // Need to wait WFB flush timeout.
+        U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100);
+
+        checkLastMethod("removeAll");
+
+        assert cache.localSize() == 0;
+        assert cache.localSize() == 0;
+
+        assert map.isEmpty();
+    }
+
+    /** @throws Exception If failed. */
+    public void testMultithreaded() throws Exception {
+        final ConcurrentMap<String, Set<Integer>> perThread = new ConcurrentHashMap<>();
+
+        final AtomicBoolean running = new AtomicBoolean(true);
+
+        final IgniteCache<Integer, String> cache = jcache();
+
+        IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
+            @SuppressWarnings({"NullableProblems"})
+            @Override public void run() {
+                // Initialize key set for this thread.
+                Set<Integer> set = new HashSet<>();
+
+                Set<Integer> old = perThread.putIfAbsent(Thread.currentThread().getName(), set);
+
+                if (old != null)
+                    set = old;
+
+                Random rnd = new Random();
+
+                int keyCnt = 20000;
+
+                while (running.get()) {
+                    int op = rnd.nextInt(2);
+                    int key = rnd.nextInt(keyCnt);
+
+                    switch (op) {
+                        case 0:
+                            cache.put(key, "val" + key);
+                            set.add(key);
+
+                            break;
+
+                        case 1:
+                        default:
+                            cache.remove(key);
+                            set.remove(key);
+
+                            break;
+                    }
+                }
+            }
+        }, 10, "put");
+
+        U.sleep(10000);
+
+        running.set(false);
+
+        fut.get();
+
+        U.sleep(5 * WRITE_FROM_BEHIND_FLUSH_FREQUENCY);
+
+        Map<Integer, String> stored = store.getMap();
+
+        for (Map.Entry<Integer, String> entry : stored.entrySet()) {
+            int key = entry.getKey();
+
+            assertEquals("Invalid value for key " + key, "val" + key, entry.getValue());
+
+            boolean found = false;
+
+            for (Set<Integer> threadPuts : perThread.values()) {
+                if (threadPuts.contains(key)) {
+                    found = true;
+
+                    break;
+                }
+            }
+
+            assert found : "No threads found that put key " + key;
+        }
+    }
+
+    /** @param mtd Expected last method value. */
+    private void checkLastMethod(@Nullable String mtd) {
+        String lastMtd = store.getLastMethod();
+
+        if (mtd == null)
+            assert lastMtd == null : "Last method must be null: " + lastMtd;
+        else {
+            assert lastMtd != null : "Last method must be not null";
+            assert lastMtd.equals(mtd) : "Last method does not match [expected=" + mtd + ", lastMtd=" + lastMtd + ']';
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreLocalTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreLocalTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreLocalTest.java
new file mode 100644
index 0000000..2325fa6
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreLocalTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.store;
+
+import org.apache.ignite.cache.*;
+
+/**
+ * Tests {@link org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore} in grid configuration.
+ */
+public class GridCacheWriteBehindStoreLocalTest extends GridCacheWriteBehindStoreAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.LOCAL;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreMultithreadedSelfTest.java
new file mode 100644
index 0000000..3bcebb0
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreMultithreadedSelfTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.store;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.util.*;
+
+/**
+ * Multithreaded tests for {@link org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore}.
+ */
+public class GridCacheWriteBehindStoreMultithreadedSelfTest extends GridCacheWriteBehindStoreAbstractSelfTest {
+    /**
+     * This test performs complex set of operations on store from multiple threads.
+     *
+     * @throws Exception If failed.
+     */
+    public void testPutGetRemove() throws Exception {
+        initStore(2);
+
+        Set<Integer> exp;
+
+        try {
+            exp = runPutGetRemoveMultithreaded(10, 10);
+        }
+        finally {
+            shutdownStore();
+        }
+
+        Map<Integer, String> map = delegate.getMap();
+
+        Collection<Integer> extra = new HashSet<>(map.keySet());
+
+        extra.removeAll(exp);
+
+        assertTrue("The underlying store contains extra keys: " + extra, extra.isEmpty());
+
+        Collection<Integer> missing = new HashSet<>(exp);
+
+        missing.removeAll(map.keySet());
+
+        assertTrue("Missing keys in the underlying store: " + missing, missing.isEmpty());
+
+        for (Integer key : exp)
+            assertEquals("Invalid value for key " + key, "val" + key, map.get(key));
+    }
+
+    /**
+     * Tests that cache would keep values if underlying store fails.
+     *
+     * @throws Exception If failed.
+     */
+    public void testStoreFailure() throws Exception {
+        delegate.setShouldFail(true);
+
+        initStore(2);
+
+        Set<Integer> exp;
+
+        try {
+            exp = runPutGetRemoveMultithreaded(10, 10);
+
+            U.sleep(FLUSH_FREQUENCY);
+
+            info(">>> There are " + store.getWriteBehindErrorRetryCount() + " entries in RETRY state");
+
+            delegate.setShouldFail(false);
+
+            // Despite that we set shouldFail flag to false, flush thread may just have caught an exception.
+            // If we move store to the stopping state right away, this value will be lost. That's why this sleep
+            // is inserted here to let all exception handlers in write-behind store exit.
+            U.sleep(1000);
+        }
+        finally {
+            shutdownStore();
+        }
+
+        Map<Integer, String> map = delegate.getMap();
+
+        Collection<Integer> extra = new HashSet<>(map.keySet());
+
+        extra.removeAll(exp);
+
+        assertTrue("The underlying store contains extra keys: " + extra, extra.isEmpty());
+
+        Collection<Integer> missing = new HashSet<>(exp);
+
+        missing.removeAll(map.keySet());
+
+        assertTrue("Missing keys in the underlying store: " + missing, missing.isEmpty());
+
+        for (Integer key : exp)
+            assertEquals("Invalid value for key " + key, "val" + key, map.get(key));
+    }
+
+    /**
+     * Tests store consistency in case of high put rate, when flush is performed from the same thread
+     * as put or remove operation.
+     *
+     * @throws Exception If failed.
+     */
+    public void testFlushFromTheSameThread() throws Exception {
+        // 50 milliseconds should be enough.
+        delegate.setOperationDelay(50);
+
+        initStore(2);
+
+        Set<Integer> exp;
+
+        int start = store.getWriteBehindTotalCriticalOverflowCount();
+
+        try {
+            //We will have in total 5 * CACHE_SIZE keys that should be enough to grow map size to critical value.
+            exp = runPutGetRemoveMultithreaded(5, CACHE_SIZE);
+        }
+        finally {
+            log.info(">>> Done inserting, shutting down the store");
+
+            shutdownStore();
+        }
+
+        // Restore delay.
+        delegate.setOperationDelay(0);
+
+        Map<Integer, String> map = delegate.getMap();
+
+        int end = store.getWriteBehindTotalCriticalOverflowCount();
+
+        log.info(">>> There are " + exp.size() + " keys in store, " + (end - start) + " overflows detected");
+
+        assertTrue("No cache overflows detected (a bug or too few keys or too few delay?)", end > start);
+
+        Collection<Integer> extra = new HashSet<>(map.keySet());
+
+        extra.removeAll(exp);
+
+        assertTrue("The underlying store contains extra keys: " + extra, extra.isEmpty());
+
+        Collection<Integer> missing = new HashSet<>(exp);
+
+        missing.removeAll(map.keySet());
+
+        assertTrue("Missing keys in the underlying store: " + missing, missing.isEmpty());
+
+        for (Integer key : exp)
+            assertEquals("Invalid value for key " + key, "val" + key, map.get(key));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java
new file mode 100644
index 0000000..e9821fb
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.store;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.apache.ignite.transactions.*;
+
+import java.util.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.transactions.TransactionConcurrency.*;
+import static org.apache.ignite.transactions.TransactionIsolation.*;
+
+/**
+ * Tests write-behind store with near and dht commit option.
+ */
+public class GridCacheWriteBehindStorePartitionedMultiNodeSelfTest extends GridCommonAbstractTest {
+    /** Grids to start. */
+    private static final int GRID_CNT = 5;
+
+    /** Ip finder. */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** Flush frequency. */
+    public static final int WRITE_BEHIND_FLUSH_FREQ = 1000;
+
+    /** Stores per grid. */
+    private GridCacheTestStore[] stores = new GridCacheTestStore[GRID_CNT];
+
+    /** Start grid counter. */
+    private int idx;
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration c = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        c.setDiscoverySpi(disco);
+
+        CacheConfiguration cc = defaultCacheConfiguration();
+
+        cc.setCacheMode(CacheMode.PARTITIONED);
+        cc.setWriteBehindEnabled(true);
+        cc.setWriteBehindFlushFrequency(WRITE_BEHIND_FLUSH_FREQ);
+        cc.setAtomicityMode(TRANSACTIONAL);
+        cc.setNearConfiguration(new NearCacheConfiguration());
+
+        CacheStore store = stores[idx] = new GridCacheTestStore();
+
+        cc.setCacheStoreFactory(singletonFactory(store));
+        cc.setReadThrough(true);
+        cc.setWriteThrough(true);
+        cc.setLoadPreviousValue(true);
+
+        c.setCacheConfiguration(cc);
+
+        idx++;
+
+        return c;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stores = null;
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void prepare() throws Exception {
+        idx = 0;
+
+        startGrids(GRID_CNT);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSingleWritesOnDhtNode() throws Exception {
+        checkSingleWrites();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBatchWritesOnDhtNode() throws Exception {
+        checkBatchWrites();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxWritesOnDhtNode() throws Exception {
+        checkTxWrites();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkSingleWrites() throws Exception {
+        prepare();
+
+        IgniteCache<Integer, String> cache = grid(0).cache(null);
+
+        for (int i = 0; i < 100; i++)
+            cache.put(i, String.valueOf(i));
+
+        checkWrites();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkBatchWrites() throws Exception {
+        prepare();
+
+        Map<Integer, String> map = new HashMap<>();
+
+        for (int i = 0; i < 100; i++)
+            map.put(i, String.valueOf(i));
+
+        grid(0).cache(null).putAll(map);
+
+        checkWrites();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkTxWrites() throws Exception {
+        prepare();
+
+        IgniteCache<Object, Object> cache = grid(0).cache(null);
+
+        try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            for (int i = 0; i < 100; i++)
+                cache.put(i, String.valueOf(i));
+
+            tx.commit();
+        }
+
+        checkWrites();
+    }
+
+    /**
+     * @throws IgniteInterruptedCheckedException If sleep was interrupted.
+     */
+    private void checkWrites() throws IgniteInterruptedCheckedException {
+        U.sleep(WRITE_BEHIND_FLUSH_FREQ * 2);
+
+        Collection<Integer> allKeys = new ArrayList<>(100);
+
+        for (int i = 0; i < GRID_CNT; i++) {
+            Map<Integer,String> map = stores[i].getMap();
+
+            assertFalse("Missing writes for node: " + i, map.isEmpty());
+
+            allKeys.addAll(map.keySet());
+
+            // Check there is no intersection.
+            for (int j = 0; j < GRID_CNT; j++) {
+                if (i == j)
+                    continue;
+
+                Collection<Integer> intersection = new HashSet<>(stores[j].getMap().keySet());
+
+                intersection.retainAll(map.keySet());
+
+                assertTrue(intersection.isEmpty());
+            }
+        }
+
+        assertEquals(100, allKeys.size());
+
+        for (int i = 0; i < 100; i++)
+            assertTrue(allKeys.contains(i));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStorePartitionedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStorePartitionedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStorePartitionedTest.java
new file mode 100644
index 0000000..fe589ca
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStorePartitionedTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.store;
+
+import org.apache.ignite.cache.*;
+
+/**
+ * Tests {@link org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore} in partitioned configuration.
+ */
+public class GridCacheWriteBehindStorePartitionedTest extends GridCacheWriteBehindStoreAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.PARTITIONED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreReplicatedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreReplicatedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreReplicatedTest.java
new file mode 100644
index 0000000..26f8431
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreReplicatedTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.store;
+
+import org.apache.ignite.cache.*;
+
+/**
+ * Tests {@link org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore} in grid configuration.
+ */
+public class GridCacheWriteBehindStoreReplicatedTest extends GridCacheWriteBehindStoreAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.REPLICATED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java
new file mode 100644
index 0000000..937e597
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.store;
+
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jsr166.*;
+
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * This class provides basic tests for {@link org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore}.
+ */
+public class GridCacheWriteBehindStoreSelfTest extends GridCacheWriteBehindStoreAbstractSelfTest {
+    /**
+     * Tests correct store shutdown when underlying store fails,
+     *
+     * @throws Exception If failed.
+     */
+    public void testShutdownWithFailure() throws Exception {
+        final AtomicReference<Exception> err = new AtomicReference<>();
+
+        multithreadedAsync(new Runnable() {
+            @Override public void run() {
+                try {
+                    delegate.setShouldFail(true);
+
+                    initStore(2);
+
+                    try {
+                        store.write(new CacheEntryImpl<>(1, "val1"));
+                        store.write(new CacheEntryImpl<>(2, "val2"));
+                    }
+                    finally {
+                        shutdownStore();
+
+                        delegate.setShouldFail(false);
+                    }
+                }
+                catch (Exception e) {
+                    err.set(e);
+                }
+            }
+        }, 1).get();
+
+        if (err.get() != null)
+            throw err.get();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSimpleStore() throws Exception {
+        initStore(2);
+
+        try {
+            store.write(new CacheEntryImpl<>(1, "v1"));
+            store.write(new CacheEntryImpl<>(2, "v2"));
+
+            assertEquals("v1", store.load(1));
+            assertEquals("v2", store.load(2));
+            assertNull(store.load(3));
+
+            store.delete(1);
+
+            assertNull(store.load(1));
+            assertEquals("v2", store.load(2));
+            assertNull(store.load(3));
+        }
+        finally {
+            shutdownStore();
+        }
+    }
+
+    /**
+     * Check that all values written to the store will be in underlying store after timeout or due to size limits.
+     *
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings({"NullableProblems"})
+    public void testValuePropagation() throws Exception {
+        // Need to test size-based write.
+        initStore(1);
+
+        try {
+            for (int i = 0; i < CACHE_SIZE * 2; i++)
+                store.write(new CacheEntryImpl<>(i, "val" + i));
+
+            U.sleep(200);
+
+            for (int i = 0; i < CACHE_SIZE; i++) {
+                String val = delegate.load(i);
+
+                assertNotNull("Value for [key= " + i + "] was not written in store", val);
+                assertEquals("Invalid value [key=" + i + "]", "val" + i, val);
+            }
+
+            U.sleep(FLUSH_FREQUENCY + 300);
+
+            for (int i = CACHE_SIZE; i < CACHE_SIZE * 2; i++) {
+                String val = delegate.load(i);
+
+                assertNotNull("Value for [key= " + i + "] was not written in store", val);
+                assertEquals("Invalid value [key=" + i + "]", "val" + i, val);
+            }
+        }
+        finally {
+            shutdownStore();
+        }
+    }
+
+    /**
+     * Tests store behaviour under continuous put of the same key with different values.
+     *
+     * @throws Exception If failed
+     */
+    public void testContinuousPut() throws Exception {
+        initStore(2);
+
+        try {
+            final AtomicBoolean running = new AtomicBoolean(true);
+
+            final AtomicInteger actualPutCnt = new AtomicInteger();
+
+            IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
+                @SuppressWarnings({"NullableProblems"})
+                @Override public void run() {
+                    try {
+                        while (running.get()) {
+                            for (int i = 0; i < CACHE_SIZE; i++) {
+                                store.write(new CacheEntryImpl<>(i, "val-0"));
+
+                                actualPutCnt.incrementAndGet();
+
+                                store.write(new CacheEntryImpl<>(i, "val" + i));
+
+                                actualPutCnt.incrementAndGet();
+                            }
+                        }
+                    }
+                    catch (Exception e) {
+                        error("Unexpected exception in put thread", e);
+
+                        assert false;
+                    }
+                }
+            }, 1, "put");
+
+            U.sleep(FLUSH_FREQUENCY * 2 + 500);
+
+            int delegatePutCnt = delegate.getPutAllCount();
+
+            running.set(false);
+
+            fut.get();
+
+            log().info(">>> [putCnt = " + actualPutCnt.get() + ", delegatePutCnt=" + delegatePutCnt + "]");
+
+            assertTrue("No puts were made to the underlying store", delegatePutCnt > 0);
+            assertTrue("Too many puts were made to the underlying store", delegatePutCnt < actualPutCnt.get() / 10);
+        }
+        finally {
+            shutdownStore();
+        }
+
+        // These checks must be done after the store shut down
+        assertEquals("Invalid store size", CACHE_SIZE, delegate.getMap().size());
+
+        for (int i = 0; i < CACHE_SIZE; i++)
+            assertEquals("Invalid value stored", "val" + i, delegate.getMap().get(i));
+    }
+
+    /**
+     * Tests that all values were put into the store will be written to the underlying store
+     * after shutdown is called.
+     *
+     * @throws Exception If failed.
+     */
+    public void testShutdown() throws Exception {
+        initStore(2);
+
+        try {
+            final AtomicBoolean running = new AtomicBoolean(true);
+
+            IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
+                @SuppressWarnings({"NullableProblems"})
+                @Override public void run() {
+                    try {
+                        while (running.get()) {
+                            for (int i = 0; i < CACHE_SIZE; i++) {
+                                store.write(new CacheEntryImpl<>(i, "val-0"));
+
+                                store.write(new CacheEntryImpl<>(i, "val" + i));
+                            }
+                        }
+                    }
+                    catch (Exception e) {
+                        error("Unexpected exception in put thread", e);
+
+                        assert false;
+                    }
+                }
+            }, 1, "put");
+
+            U.sleep(300);
+
+            running.set(false);
+
+            fut.get();
+        }
+        finally {
+            shutdownStore();
+        }
+
+        // These checks must be done after the store shut down
+        assertEquals("Invalid store size", CACHE_SIZE, delegate.getMap().size());
+
+        for (int i = 0; i < CACHE_SIZE; i++)
+            assertEquals("Invalid value stored", "val" + i, delegate.getMap().get(i));
+    }
+
+    /**
+     * Tests that all values will be written to the underlying store
+     * right in the same order as they were put into the store.
+     *
+     * @throws Exception If failed.
+     */
+    public void testBatchApply() throws Exception {
+        delegate = new GridCacheTestStore(new ConcurrentLinkedHashMap<Integer, String>());
+
+        initStore(1);
+
+        List<Integer> intList = new ArrayList<>(CACHE_SIZE);
+
+        try {
+            for (int i = 0; i < CACHE_SIZE; i++) {
+                store.write(new CacheEntryImpl<>(i, "val" + i));
+
+                intList.add(i);
+            }
+        }
+        finally {
+            shutdownStore();
+        }
+
+        Map<Integer, String> underlyingMap = delegate.getMap();
+
+        assertTrue("Store map key set: " + underlyingMap.keySet(), F.eqOrdered(underlyingMap.keySet(), intList));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
index 20a9caf..b277d48 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.processors.cache.dr.*;
 import org.apache.ignite.internal.processors.cache.jta.*;
 import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.processors.cache.query.continuous.*;
+import org.apache.ignite.internal.processors.cache.store.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.processors.plugin.*;
@@ -59,10 +60,7 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> {
             true,
             new GridCacheEventManager(),
             new GridCacheSwapManager(false),
-            new GridCacheStoreManager(null,
-                new IdentityHashMap<CacheStore, ThreadLocal>(),
-                null,
-                new CacheConfiguration()),
+            new CacheOsStoreManager(null, new CacheConfiguration()),
             new GridCacheEvictionManager(),
             new GridCacheLocalQueryManager<K, V>(),
             new CacheContinuousQueryManager(),
@@ -74,5 +72,7 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> {
             new CacheOsConflictResolutionManager<K, V>(),
             new CachePluginManager(ctx, new CacheConfiguration())
         );
+
+        store().initialize(null, new IdentityHashMap<CacheStore, ThreadLocal>());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6629012/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheWriteBehindTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheWriteBehindTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheWriteBehindTestSuite.java
index 452dbf1..529b227 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheWriteBehindTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheWriteBehindTestSuite.java
@@ -19,9 +19,10 @@ package org.apache.ignite.testsuites;
 
 import junit.framework.*;
 import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.store.*;
 
 /**
- * Test suite that contains all tests for {@link GridCacheWriteBehindStore}.
+ * Test suite that contains all tests for {@link org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore}.
  */
 public class IgniteCacheWriteBehindTestSuite extends TestSuite {
     /**


Mime
View raw message