ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [23/47] ignite git commit: IGNITE-5267 - Moved ignite-ps module to ignite-core
Date Sun, 11 Jun 2017 20:03:49 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/6bf5ce46/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsTransactionsHangTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsTransactionsHangTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsTransactionsHangTest.java
new file mode 100644
index 0000000..d4dfdec
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsTransactionsHangTest.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db;
+
+import java.util.Random;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.BinaryConfiguration;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.configuration.MemoryPolicyConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
+import org.apache.ignite.configuration.TransactionConfiguration;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.jsr166.LongAdder8;
+
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * Checks that transactions don't hang during checkpoint creation.
+ */
+public class IgnitePdsTransactionsHangTest extends GridCommonAbstractTest {
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Page cache size. */
+    private static final int PAGE_CACHE_SIZE = 512;
+
+    /** Page size. */
+    private static final Integer PAGE_SIZE = 16;
+
+    /** Cache name. */
+    private static final String CACHE_NAME = "IgnitePdsTransactionsHangTest";
+
+    /** Number of insert threads. */
+    public static final int THREADS_CNT = 32;
+
+    /** Warm up period, seconds. */
+    public static final int WARM_UP_PERIOD = 20;
+
+    /** Duration. */
+    public static final int DURATION = 180;
+
+    /** Maximum count of inserted keys. */
+    public static final int MAX_KEY_COUNT = 500_000;
+
+    /** Checkpoint frequency. */
+    public static final long CHECKPOINT_FREQUENCY = 20_000;
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return Long.MAX_VALUE;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+        discoSpi.setIpFinder(IP_FINDER);
+        cfg.setDiscoverySpi(discoSpi);
+
+        BinaryConfiguration binaryCfg = new BinaryConfiguration();
+        binaryCfg.setCompactFooter(false);
+        cfg.setBinaryConfiguration(binaryCfg);
+
+        cfg.setPeerClassLoadingEnabled(true);
+
+        TcpCommunicationSpi tcpCommSpi = new TcpCommunicationSpi();
+
+        tcpCommSpi.setSharedMemoryPort(-1);
+        cfg.setCommunicationSpi(tcpCommSpi);
+
+        TransactionConfiguration txCfg = new TransactionConfiguration();
+
+        txCfg.setDefaultTxIsolation(TransactionIsolation.READ_COMMITTED);
+
+        cfg.setTransactionConfiguration(txCfg);
+
+        cfg.setPersistentStoreConfiguration(
+            new PersistentStoreConfiguration()
+                .setWalHistorySize(1)
+                .setCheckpointingFrequency(CHECKPOINT_FREQUENCY)
+        );
+
+        MemoryPolicyConfiguration memPlcCfg = new MemoryPolicyConfiguration();
+
+        memPlcCfg.setName("dfltMemPlc");
+        memPlcCfg.setInitialSize(PAGE_CACHE_SIZE * 1024 * 1024);
+        memPlcCfg.setMaxSize(PAGE_CACHE_SIZE * 1024 * 1024);
+
+        MemoryConfiguration memCfg = new MemoryConfiguration();
+
+        memCfg.setMemoryPolicies(memPlcCfg);
+        memCfg.setDefaultMemoryPolicyName("dfltMemPlc");
+
+        memCfg.setPageSize(PAGE_SIZE * 1024);
+
+        cfg.setMemoryConfiguration(memCfg);
+
+        return cfg;
+    }
+
+    /**
+     * Creates cache configuration.
+     *
+     * @return Cache configuration.
+     * */
+    private CacheConfiguration getCacheConfiguration() {
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName(CACHE_NAME);
+        ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+        ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        ccfg.setAffinity(new RendezvousAffinityFunction(false, 64 * 4));
+        ccfg.setReadFromBackup(true);
+
+        ccfg.setCacheMode(CacheMode.PARTITIONED);
+
+        return ccfg;
+    }
+
+    /**
+     * Copied from customers benchmark.
+     *
+     * @throws Exception If failed.
+     * */
+    public void testTransactionsDontHang() throws Exception {
+        try {
+            final Ignite g = startGrids(2);
+
+            g.getOrCreateCache(getCacheConfiguration());
+
+            ExecutorService threadPool = Executors.newFixedThreadPool(THREADS_CNT);
+            final CyclicBarrier cyclicBarrier = new CyclicBarrier(THREADS_CNT);
+
+            final AtomicBoolean interrupt = new AtomicBoolean(false);
+            final LongAdder8 operationCnt = new LongAdder8();
+
+            final IgniteCache<Long, TestEntity> cache = g.cache(CACHE_NAME);
+
+            for (int i = 0; i < THREADS_CNT; i++) {
+                threadPool.submit(new Runnable() {
+                    @Override public void run() {
+                        try {
+                            ThreadLocalRandom locRandom = ThreadLocalRandom.current();
+
+                            cyclicBarrier.await();
+
+                            while (!interrupt.get()) {
+                                long randomKey = locRandom.nextLong(MAX_KEY_COUNT);
+
+                                TestEntity entity = TestEntity.newTestEntity(locRandom);
+
+                                try (Transaction tx = g.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                                    cache.put(randomKey, entity);
+
+                                    tx.commit();
+                                }
+
+                                operationCnt.increment();
+                            }
+                        }
+                        catch (Throwable e) {
+                            System.out.println(e.toString());
+
+                            throw new RuntimeException(e);
+                        }
+                    }
+                });
+            }
+
+            long stopTime = System.currentTimeMillis() + DURATION * 1000;
+            long totalOperations = 0;
+            int periods = 0;
+            long max = Long.MIN_VALUE, min = Long.MAX_VALUE;
+
+            while (System.currentTimeMillis() < stopTime) {
+                U.sleep(1000);
+
+                long sum = operationCnt.sumThenReset();
+                periods++;
+
+                if (periods > WARM_UP_PERIOD) {
+                    totalOperations += sum;
+
+                    max = Math.max(max, sum);
+                    min = Math.min(min, sum);
+
+                    System.out.println("Operation count: " + sum + " min=" + min + " max=" + max + " avg=" + totalOperations / (periods - WARM_UP_PERIOD));
+                }
+            }
+
+            interrupt.set(true);
+
+            threadPool.shutdown();
+            System.out.println("Test complete");
+
+            threadPool.awaitTermination(getTestTimeout(), TimeUnit.MILLISECONDS);
+
+            IgniteTxManager tm = internalCache(cache).context().tm();
+
+            assertEquals("There are still active transactions", 0, tm.activeTransactions().size());
+        } finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * Entity for test.
+     * */
+    public static class TestEntity {
+        /** String value. */
+        private String strVal;
+
+        /** Long value. */
+        private Long longVal;
+
+        /** Int value. */
+        private int intVal;
+
+        /**
+         * @param strVal String value.
+         */
+        public void setStrVal(String strVal) {
+            this.strVal = strVal;
+        }
+
+        /**
+         * @param longVal Long value.
+         */
+        public void setLongVal(Long longVal) {
+            this.longVal = longVal;
+        }
+
+        /**
+         * @param intVal Integer value.
+         */
+        public void setIntVal(int intVal) {
+            this.intVal = intVal;
+        }
+
+        /**
+         * Creates test entity with random values.
+         *
+         * @param random Random seq generator.
+         * @return new test entity
+         * */
+        private static TestEntity newTestEntity(Random random) {
+            TestEntity entity = new TestEntity();
+
+            entity.setLongVal((long) random.nextInt(1_000));
+            entity.setIntVal(random.nextInt(1_000));
+            entity.setStrVal("test" + random.nextInt(1_000));
+
+            return entity;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6bf5ce46/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWholeClusterRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWholeClusterRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWholeClusterRestartTest.java
new file mode 100644
index 0000000..b512a64
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWholeClusterRestartTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.configuration.MemoryPolicyConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.checkpoint.noop.NoopCheckpointSpi;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class IgnitePdsWholeClusterRestartTest extends GridCommonAbstractTest {
+    /** */
+    private static final int GRID_CNT = 5;
+
+    /** */
+    private static final int ENTRIES_COUNT = 1_000;
+
+    /** */
+    public static final String CACHE_NAME = "cache1";
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        MemoryConfiguration dbCfg = new MemoryConfiguration();
+
+        MemoryPolicyConfiguration memPlcCfg = new MemoryPolicyConfiguration();
+
+        memPlcCfg.setName("dfltMemPlc");
+        memPlcCfg.setInitialSize(100 * 1024 * 1024);
+        memPlcCfg.setMaxSize(100 * 1024 * 1024);
+
+        dbCfg.setMemoryPolicies(memPlcCfg);
+        dbCfg.setDefaultMemoryPolicyName("dfltMemPlc");
+
+        cfg.setMemoryConfiguration(dbCfg);
+
+        CacheConfiguration ccfg1 = new CacheConfiguration();
+
+        ccfg1.setName(CACHE_NAME);
+        ccfg1.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+        ccfg1.setRebalanceMode(CacheRebalanceMode.SYNC);
+        ccfg1.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        ccfg1.setAffinity(new RendezvousAffinityFunction(false, 32));
+        ccfg1.setBackups(2);
+
+        cfg.setLateAffinityAssignment(false);
+        cfg.setActiveOnStart(false);
+
+        // To avoid hostname lookup on start.
+        cfg.setCheckpointSpi(new NoopCheckpointSpi());
+
+        cfg.setCacheConfiguration(ccfg1);
+
+        cfg.setPersistentStoreConfiguration(
+            new PersistentStoreConfiguration()
+                .setWalMode(WALMode.LOG_ONLY)
+        );
+
+        cfg.setConsistentId(gridName);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        stopAllGrids();
+
+        deleteWorkFiles();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        deleteWorkFiles();
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    private void deleteWorkFiles() throws IgniteCheckedException {
+        deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false));
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testRestarts() throws Exception {
+        startGrids(GRID_CNT);
+
+        ignite(0).active(true);
+
+        awaitPartitionMapExchange();
+
+        try (IgniteDataStreamer<Object, Object> ds = ignite(0).dataStreamer(CACHE_NAME)) {
+            for (int i = 0; i < ENTRIES_COUNT; i++)
+                ds.addData(i, i);
+        }
+
+        stopAllGrids();
+
+        List<Integer> idxs = new ArrayList<>();
+
+        for (int i = 0; i < GRID_CNT; i++)
+            idxs.add(i);
+
+        for (int r = 0; r < 10; r++) {
+            Collections.shuffle(idxs);
+
+            info("Will start in the following order: " + idxs);
+
+            for (Integer idx : idxs)
+                startGrid(idx);
+
+            try {
+                ignite(0).active(true);
+
+                for (int g = 0; g < GRID_CNT; g++) {
+                    Ignite ig = ignite(g);
+
+                    for (int k = 0; k < ENTRIES_COUNT; k++)
+                        assertEquals("Failed to read [g=" + g + ", part=" + ig.affinity(CACHE_NAME).partition(k) +
+                            ", nodes=" + ig.affinity(CACHE_NAME).mapKeyToPrimaryAndBackups(k) + ']',
+                            k, ig.cache(CACHE_NAME).get(k));
+                }
+            }
+            finally {
+                stopAllGrids();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6bf5ce46/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCacheIntegrationTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCacheIntegrationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCacheIntegrationTest.java
new file mode 100644
index 0000000..c68f7e7
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCacheIntegrationTest.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db.file;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.concurrent.Callable;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.BinaryConfiguration;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryPolicyConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class IgnitePdsCacheIntegrationTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int GRID_CNT = 3;
+
+    /** */
+    private static final String CACHE_NAME = "cache";
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        MemoryConfiguration dbCfg = new MemoryConfiguration();
+
+        dbCfg.setConcurrencyLevel(Runtime.getRuntime().availableProcessors() * 4);
+
+        MemoryPolicyConfiguration memPlcCfg = new MemoryPolicyConfiguration();
+
+        memPlcCfg.setName("dfltMemPlc");
+        memPlcCfg.setInitialSize(100 * 1024 * 1024);
+        memPlcCfg.setMaxSize(100 * 1024 * 1024);
+
+        dbCfg.setMemoryPolicies(memPlcCfg);
+        dbCfg.setDefaultMemoryPolicyName("dfltMemPlc");
+
+        cfg.setPersistentStoreConfiguration(
+            new PersistentStoreConfiguration()
+                .setWalMode(WALMode.LOG_ONLY)
+        );
+
+        cfg.setMemoryConfiguration(dbCfg);
+
+        CacheConfiguration ccfg = new CacheConfiguration(CACHE_NAME);
+
+        ccfg.setIndexedTypes(Integer.class, DbValue.class);
+
+        ccfg.setRebalanceMode(CacheRebalanceMode.NONE);
+
+        ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+
+        ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+
+        ccfg.setAffinity(new RendezvousAffinityFunction(false, 32));
+
+        cfg.setCacheConfiguration(ccfg);
+
+        cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
+
+        cfg.setMarshaller(null);
+
+        BinaryConfiguration bCfg = new BinaryConfiguration();
+
+        bCfg.setCompactFooter(false);
+
+        cfg.setBinaryConfiguration(bCfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false));
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testPutGetSimple() throws Exception {
+        startGrids(GRID_CNT);
+
+        try {
+            IgniteEx ig = grid(0);
+
+            checkPutGetSql(ig, true);
+        }
+        finally {
+            stopAllGrids();
+        }
+
+        info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
+        info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
+        info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
+
+        startGrids(GRID_CNT);
+
+        try {
+            IgniteEx ig = grid(0);
+
+            checkPutGetSql(ig, false);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testPutMultithreaded() throws Exception {
+        startGrids(4);
+
+        try {
+            final IgniteEx grid = grid(0);
+
+            GridTestUtils.runMultiThreaded(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    for (int i = 0; i < 1000; i++)
+                        grid.cache(CACHE_NAME).put(i, i);
+
+                    return null;
+                }
+            }, 8, "updater");
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @param ig Ignite instance.
+     * @param write Write flag.
+     */
+    private void checkPutGetSql(Ignite ig, boolean write) {
+        IgniteCache<Integer, DbValue> cache = ig.cache(CACHE_NAME);
+
+        int entryCnt = 50_000;
+
+        if (write) {
+            try (IgniteDataStreamer<Object, Object> streamer = ig.dataStreamer(CACHE_NAME)) {
+                streamer.allowOverwrite(true);
+
+                for (int i = 0; i < entryCnt; i++)
+                    streamer.addData(i, new DbValue(i, "value-" + i, i));
+            }
+        }
+
+        for (int i = 0; i < GRID_CNT; i++) {
+            IgniteEx ignite = grid(i);
+
+            GridCacheAdapter<Object, Object> cache0 = ignite.context().cache().internalCache(CACHE_NAME);
+
+            for (int k = 0; k < entryCnt; k++)
+                assertNull(cache0.peekEx(i));
+
+            assertEquals(entryCnt, ignite.cache(CACHE_NAME).size());
+        }
+
+        for (int i = 0; i < entryCnt; i++)
+            assertEquals("i = " + i, new DbValue(i, "value-" + i, i), cache.get(i));
+
+        List<List<?>> res = cache.query(new SqlFieldsQuery("select ival from dbvalue where ival < ? order by ival asc")
+            .setArgs(10_000)).getAll();
+
+        assertEquals(10_000, res.size());
+
+        for (int i = 0; i < 10_000; i++) {
+            assertEquals(1, res.get(i).size());
+            assertEquals(i, res.get(i).get(0));
+        }
+
+        assertEquals(1, cache.query(new SqlFieldsQuery("select lval from dbvalue where ival = 7899")).getAll().size());
+        assertEquals(5000, cache.query(new SqlFieldsQuery("select lval from dbvalue where ival >= 5000 and ival < 10000"))
+            .getAll().size());
+
+        for (int i = 0; i < 10_000; i++)
+            assertEquals(new DbValue(i, "value-" + i, i), cache.get(i));
+    }
+
+    /**
+     *
+     */
+    private static class DbValue implements Serializable {
+        /** */
+        @QuerySqlField(index = true)
+        private int iVal;
+
+        /** */
+        @QuerySqlField(index = true)
+        private String sVal;
+
+        /** */
+        @QuerySqlField
+        private long lVal;
+
+        /**
+         * @param iVal Integer value.
+         * @param sVal String value.
+         * @param lVal Long value.
+         */
+        private DbValue(int iVal, String sVal, long lVal) {
+            this.iVal = iVal;
+            this.sVal = sVal;
+            this.lVal = lVal;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            DbValue dbVal = (DbValue)o;
+
+            return iVal == dbVal.iVal && lVal == dbVal.lVal &&
+                !(sVal != null ? !sVal.equals(dbVal.sVal) : dbVal.sVal != null);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = iVal;
+
+            res = 31 * res + (sVal != null ? sVal.hashCode() : 0);
+            res = 31 * res + (int)(lVal ^ (lVal >>> 32));
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(DbValue.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6bf5ce46/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsEvictionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsEvictionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsEvictionTest.java
new file mode 100644
index 0000000..1026d4e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsEvictionTest.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db.file;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.configuration.MemoryPolicyConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.pagemem.PageUtils;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Test for page evictions.
+ */
+public class IgnitePdsEvictionTest extends GridCommonAbstractTest {
+    /** */
+    private static final int NUMBER_OF_SEGMENTS = 64;
+
+    /** */
+    private static final int PAGE_SIZE = 1024;
+
+    /** */
+    private static final long CHUNK_SIZE = 1024 * 1024;
+
+    /** */
+    private static final long MEMORY_LIMIT = 10 * CHUNK_SIZE;
+
+    /** */
+    private static final int PAGES_NUM = 128_000;
+
+    /** Cache name. */
+    private final String cacheName = "cache";
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        final IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setPersistentStoreConfiguration(new PersistentStoreConfiguration());
+
+        cfg.setMemoryConfiguration(createDbConfig());
+
+        cfg.setCacheConfiguration(new CacheConfiguration<>(cacheName));
+
+        return cfg;
+    }
+
+    /**
+     * @return DB config.
+     */
+    private MemoryConfiguration createDbConfig() {
+        final MemoryConfiguration memCfg = new MemoryConfiguration();
+
+        MemoryPolicyConfiguration memPlcCfg = new MemoryPolicyConfiguration();
+        memPlcCfg.setInitialSize(MEMORY_LIMIT);
+        memPlcCfg.setMaxSize(MEMORY_LIMIT);
+        memPlcCfg.setName("dfltMemPlc");
+
+        memCfg.setPageSize(PAGE_SIZE);
+        memCfg.setConcurrencyLevel(NUMBER_OF_SEGMENTS);
+        memCfg.setMemoryPolicies(memPlcCfg);
+        memCfg.setDefaultMemoryPolicyName("dfltMemPlc");
+
+        return memCfg;
+    }
+
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        deleteWorkFiles();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+
+        deleteWorkFiles();
+    }
+
+    /**
+     * @throws Exception If fail.
+     */
+    public void testPageEviction() throws Exception {
+        final IgniteEx ig = startGrid(0);
+
+        final PageMemory memory = getMemory(ig);
+
+        writeData(ig, memory, CU.cacheId(cacheName));
+    }
+
+    /**
+     * @param memory Page memory.
+     * @param cacheId Cache id.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void writeData(final IgniteEx ignite, final PageMemory memory, final int cacheId) throws Exception {
+        final int size = PAGES_NUM;
+
+        final List<FullPageId> pageIds = new ArrayList<>(size);
+
+        IgniteCacheDatabaseSharedManager db = ignite.context().cache().context().database();
+
+        // Allocate.
+        for (int i = 0; i < size; i++) {
+            db.checkpointReadLock();
+            try {
+                final FullPageId fullId = new FullPageId(memory.allocatePage(cacheId, i % 256, PageMemory.FLAG_DATA),
+                    cacheId);
+
+                pageIds.add(fullId);
+            }
+            finally {
+                db.checkpointReadUnlock();
+            }
+        }
+
+        System.out.println("Allocated pages: " + pageIds.size());
+
+        // Write data. (Causes evictions.)
+        final int part = PAGES_NUM / NUMBER_OF_SEGMENTS;
+
+        final Collection<IgniteInternalFuture> futs = new ArrayList<>();
+
+        for (int i = 0; i < PAGES_NUM; i += part)
+            futs.add(runWriteInThread(ignite, i, i + part, memory, pageIds));
+
+        for (final IgniteInternalFuture fut : futs)
+            fut.get();
+
+        System.out.println("Wrote pages: " + pageIds.size());
+
+        // Read data. (Causes evictions.)
+        futs.clear();
+
+        for (int i = 0; i < PAGES_NUM; i += part)
+            futs.add(runReadInThread(ignite, i, i + part, memory, pageIds));
+
+        for (final IgniteInternalFuture fut : futs)
+            fut.get();
+
+        System.out.println("Read pages: " + pageIds.size());
+    }
+
+    /**
+     * @param start Start index.
+     * @param end End index.
+     * @param memory PageMemory.
+     * @param pageIds Allocated pages.
+     * @return Future.
+     * @throws Exception If fail.
+     */
+    private IgniteInternalFuture runWriteInThread(
+        final IgniteEx ignite,
+        final int start,
+        final int end,
+        final PageMemory memory,
+        final List<FullPageId> pageIds
+    ) throws Exception {
+
+        return GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                IgniteCacheDatabaseSharedManager db = ignite.context().cache().context().database();
+
+                for (int i = start; i < end; i++) {
+                    db.checkpointReadLock();
+
+                    try {
+                        FullPageId fullId = pageIds.get(i);
+
+                        long page = memory.acquirePage(fullId.cacheId(), fullId.pageId());
+
+                        try {
+                            final long pageAddr = memory.writeLock(fullId.cacheId(), fullId.pageId(), page);
+
+                            try {
+                                PageIO.setPageId(pageAddr, fullId.pageId());
+
+                                PageUtils.putLong(pageAddr, PageIO.COMMON_HEADER_END, i * 2);
+                            }
+                            finally {
+                                memory.writeUnlock(fullId.cacheId(), fullId.pageId(), page, null, true);
+                            }
+                        }
+                        finally {
+                            memory.releasePage(fullId.cacheId(), fullId.pageId(), page);
+                        }
+                    }
+                    finally {
+                        db.checkpointReadUnlock();
+                    }
+                }
+
+                return null;
+            }
+        });
+    }
+
+    /**
+     * @param start Start index.
+     * @param end End index.
+     * @param memory PageMemory.
+     * @param pageIds Allocated pages.
+     * @return Future.
+     * @throws Exception If fail.
+     */
+    private IgniteInternalFuture runReadInThread(final IgniteEx ignite, final int start, final int end,
+        final PageMemory memory,
+        final List<FullPageId> pageIds) throws Exception {
+        return GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                IgniteCacheDatabaseSharedManager db = ignite.context().cache().context().database();
+
+                for (int i = start; i < end; i++) {
+                    db.checkpointReadLock();
+
+                    try {
+                        final FullPageId fullId = pageIds.get(i);
+
+                        long page = memory.acquirePage(fullId.cacheId(), fullId.pageId());
+                        try {
+                            final long pageAddr = memory.readLock(fullId.cacheId(), fullId.pageId(), page);
+
+                            try {
+                                assertEquals(i * 2, PageUtils.getLong(pageAddr, PageIO.COMMON_HEADER_END));
+                            }
+                            finally {
+                                memory.readUnlock(fullId.cacheId(), fullId.pageId(), page);
+                            }
+                        }
+                        finally {
+                            memory.releasePage(fullId.cacheId(), fullId.pageId(), page);
+                        }
+                    }
+                    finally {
+                        db.checkpointReadUnlock();
+                    }
+                }
+
+                return null;
+            }
+        });
+    }
+
+    /**
+     * @param ig Ignite instance.
+     * @return Memory and store.
+     * @throws Exception If failed to initialize the store.
+     */
+    private PageMemory getMemory(IgniteEx ig) throws Exception {
+        final GridCacheSharedContext<Object, Object> sharedCtx = ig.context().cache().context();
+
+        final IgniteCacheDatabaseSharedManager db = sharedCtx.database();
+
+        return db.memoryPolicy(null).pageMemory();
+    }
+
+    /**
+     * @throws IgniteCheckedException If fail.
+     */
+    private void deleteWorkFiles() throws IgniteCheckedException {
+        deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6bf5ce46/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsNoActualWalHistoryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsNoActualWalHistoryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsNoActualWalHistoryTest.java
new file mode 100644
index 0000000..9631686
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsNoActualWalHistoryTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db.file;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.BinaryConfiguration;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class IgnitePdsNoActualWalHistoryTest extends GridCommonAbstractTest {
+    /** Cache name. */
+    private static final String CACHE_NAME = "cache";
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration<Integer, IndexedObject> ccfg = new CacheConfiguration<>(CACHE_NAME);
+
+        ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
+        ccfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+        ccfg.setAffinity(new RendezvousAffinityFunction(false, 32));
+
+        cfg.setCacheConfiguration(ccfg);
+
+        MemoryConfiguration dbCfg = new MemoryConfiguration();
+
+        dbCfg.setPageSize(4 * 1024);
+
+        cfg.setMemoryConfiguration(dbCfg);
+
+        cfg.setPersistentStoreConfiguration(
+            new PersistentStoreConfiguration()
+                .setWalSegmentSize(4 * 1024 * 1024)
+                .setWalHistorySize(2)
+                .setWalSegments(10)
+                .setWalMode(WALMode.LOG_ONLY)
+        );
+
+        cfg.setMarshaller(null);
+
+        BinaryConfiguration binCfg = new BinaryConfiguration();
+
+        binCfg.setCompactFooter(false);
+
+        cfg.setBinaryConfiguration(binCfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        stopAllGrids();
+
+        deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false));
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testWalBig() throws Exception {
+        try {
+            IgniteEx ignite = startGrid(1);
+
+            IgniteCache<Object, Object> cache = ignite.cache(CACHE_NAME);
+
+            Random rnd = new Random();
+
+            Map<Integer, IndexedObject> map = new HashMap<>();
+
+            for (int i = 0; i < 40_000; i++) {
+                if (i % 1000 == 0)
+                    X.println(" >> " + i);
+
+                int k = rnd.nextInt(300_000);
+                IndexedObject v = new IndexedObject(rnd.nextInt(10_000));
+
+                cache.put(k, v);
+                map.put(k, v);
+            }
+
+            GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)ignite.context().cache().context()
+                .database();
+
+            // Create many checkpoints to clean up the history.
+            dbMgr.wakeupForCheckpoint("test").get();
+            dbMgr.wakeupForCheckpoint("test").get();
+            dbMgr.wakeupForCheckpoint("test").get();
+            dbMgr.wakeupForCheckpoint("test").get();
+            dbMgr.wakeupForCheckpoint("test").get();
+            dbMgr.wakeupForCheckpoint("test").get();
+            dbMgr.wakeupForCheckpoint("test").get();
+
+            dbMgr.enableCheckpoints(false).get();
+
+            for (int i = 0; i < 50; i++) {
+                int k = rnd.nextInt(300_000);
+                IndexedObject v = new IndexedObject(rnd.nextInt(10_000));
+
+                cache.put(k, v);
+                map.put(k, v);
+            }
+
+            stopGrid(1);
+
+            ignite = startGrid(1);
+
+            cache = ignite.cache(CACHE_NAME);
+
+            // Check.
+            for (Integer k : map.keySet())
+                assertEquals(map.get(k), cache.get(k));
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     *
+     */
+    private static class IndexedObject {
+        /** */
+        @QuerySqlField(index = true)
+        private int iVal;
+
+        /** */
+        private byte[] payload = new byte[1024];
+
+        /**
+         * @param iVal Integer value.
+         */
+        private IndexedObject(int iVal) {
+            this.iVal = iVal;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (!(o instanceof IndexedObject))
+                return false;
+
+            IndexedObject that = (IndexedObject)o;
+
+            return iVal == that.iVal;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return iVal;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(IndexedObject.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    private enum EnumVal {
+        /** */
+        VAL1,
+
+        /** */
+        VAL2,
+
+        /** */
+        VAL3
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6bf5ce46/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/ignitePdsCheckpointSimulationTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/ignitePdsCheckpointSimulationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/ignitePdsCheckpointSimulationTest.java
new file mode 100644
index 0000000..840042e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/ignitePdsCheckpointSimulationTest.java
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db.file;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.nio.ByteOrder;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.pagemem.PageIdAllocator;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.pagemem.PageUtils;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
+import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.TrackingPageIO;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
+import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
+import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class ignitePdsCheckpointSimulationTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int TOTAL_PAGES = 1000;
+
+    /** */
+    private static final boolean VERBOSE = false;
+
+    /** Cache name. */
+    private final String cacheName = "cache";
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration ccfg = new CacheConfiguration(cacheName);
+
+        ccfg.setRebalanceMode(CacheRebalanceMode.NONE);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        MemoryConfiguration dbCfg = new MemoryConfiguration();
+
+        cfg.setMemoryConfiguration(dbCfg);
+
+        cfg.setPersistentStoreConfiguration(
+            new PersistentStoreConfiguration()
+                .setCheckpointingFrequency(500)
+                .setWalMode(WALMode.LOG_ONLY)
+                .setAlwaysWriteFullPages(true)
+        );
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        stopAllGrids();
+
+        deleteWorkFiles();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        deleteWorkFiles();
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testCheckpointSimulationMultiThreaded() throws Exception {
+        IgniteEx ig = startGrid(0);
+
+        GridCacheSharedContext<Object, Object> shared = ig.context().cache().context();
+
+        GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)shared.database();
+
+        IgnitePageStoreManager pageStore = shared.pageStore();
+
+        U.sleep(1_000);
+
+        // Disable integrated checkpoint thread.
+        dbMgr.enableCheckpoints(false).get();
+
+        // Must put something in partition 0 in order to initialize meta page.
+        // Otherwise we will violate page store integrity rules.
+        ig.cache(cacheName).put(0, 0);
+
+        PageMemory mem = shared.database().memoryPolicy(null).pageMemory();
+
+        IgniteBiTuple<Map<FullPageId, Integer>, WALPointer> res;
+
+        try {
+            res = runCheckpointing(ig, (PageMemoryImpl)mem, pageStore, shared.wal(),
+                shared.cache().cache(cacheName).context().cacheId());
+        }
+        catch (Throwable th) {
+            log().error("Error while running checkpointing", th);
+
+            throw th;
+        }
+        finally {
+            dbMgr.enableCheckpoints(true).get();
+
+            stopAllGrids(false);
+        }
+
+        ig = startGrid(0);
+
+        shared = ig.context().cache().context();
+
+        dbMgr = (GridCacheDatabaseSharedManager)shared.database();
+
+        dbMgr.enableCheckpoints(false).get();
+
+        mem = shared.database().memoryPolicy(null).pageMemory();
+
+        verifyReads(res.get1(), mem, res.get2(), shared.wal());
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testGetForInitialWrite() throws Exception {
+        IgniteEx ig = startGrid(0);
+
+        GridCacheSharedContext<Object, Object> shared = ig.context().cache().context();
+
+        int cacheId = shared.cache().cache(cacheName).context().cacheId();
+
+        GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)shared.database();
+
+        // Disable integrated checkpoint thread.
+        dbMgr.enableCheckpoints(false);
+
+        PageMemory mem = shared.database().memoryPolicy(null).pageMemory();
+
+        IgniteWriteAheadLogManager wal = shared.wal();
+
+        WALPointer start = wal.log(new CheckpointRecord(null, false));
+
+        final FullPageId[] initWrites = new FullPageId[10];
+
+        ig.context().cache().context().database().checkpointReadLock();
+
+        try {
+            for (int i = 0; i < initWrites.length; i++)
+                initWrites[i] = new FullPageId(mem.allocatePage(cacheId, 0, PageIdAllocator.FLAG_DATA), cacheId);
+
+            // Check getForInitialWrite methods.
+            for (FullPageId fullId : initWrites) {
+                long page = mem.acquirePage(fullId.cacheId(), fullId.pageId());
+                try {
+                    long pageAddr = mem.writeLock(fullId.cacheId(), fullId.pageId(), page);
+
+                    try {
+                        DataPageIO.VERSIONS.latest().initNewPage(pageAddr, fullId.pageId(), mem.pageSize());
+
+                        for (int i = PageIO.COMMON_HEADER_END; i < mem.pageSize(); i++)
+                            PageUtils.putByte(pageAddr, i, (byte)0xAB);
+                    }
+                    finally {
+                        mem.writeUnlock(fullId.cacheId(), fullId.pageId(), page, null, true);
+                    }
+                }
+                finally {
+                    mem.releasePage(fullId.cacheId(), fullId.pageId(), page);
+                }
+            }
+
+            wal.fsync(null);
+        }
+        finally {
+            ig.context().cache().context().database().checkpointReadUnlock();
+            stopAllGrids(false);
+        }
+
+        ig = startGrid(0);
+
+        shared = ig.context().cache().context();
+
+        dbMgr = (GridCacheDatabaseSharedManager)shared.database();
+
+        dbMgr.enableCheckpoints(false);
+
+        wal = shared.wal();
+
+        try (WALIterator it = wal.replay(start)) {
+            it.nextX();
+
+            for (FullPageId initialWrite : initWrites) {
+                IgniteBiTuple<WALPointer, WALRecord> tup = it.nextX();
+
+                assertTrue(String.valueOf(tup.get2()), tup.get2() instanceof PageSnapshot);
+
+                PageSnapshot snap = (PageSnapshot)tup.get2();
+
+                FullPageId actual = snap.fullPageId();
+
+                //there are extra tracking pages, skip them
+                if (TrackingPageIO.VERSIONS.latest().trackingPageFor(actual.pageId(), mem.pageSize()) == actual.pageId()) {
+                    tup = it.nextX();
+
+                    assertTrue(tup.get2() instanceof PageSnapshot);
+
+                    actual = ((PageSnapshot)tup.get2()).fullPageId();
+                }
+
+                assertEquals(initialWrite, actual);
+            }
+        }
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testDataWalEntries() throws Exception {
+        IgniteEx ig = startGrid(0);
+
+        GridCacheSharedContext<Object, Object> sharedCtx = ig.context().cache().context();
+        GridCacheContext<Object, Object> cctx = sharedCtx.cache().cache(cacheName).context();
+
+        GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)sharedCtx.database();
+        IgniteWriteAheadLogManager wal = sharedCtx.wal();
+
+        assertTrue(wal.isAlwaysWriteFullPages());
+
+        db.enableCheckpoints(false);
+
+        final int cnt = 10;
+
+        List<DataEntry> entries = new ArrayList<>(cnt);
+
+        for (int i = 0; i < cnt; i++) {
+            GridCacheOperation op = i % 2 == 0 ? GridCacheOperation.UPDATE : GridCacheOperation.DELETE;
+
+            KeyCacheObject key = cctx.toCacheKeyObject(i);
+
+            CacheObject val = null;
+
+            if (op != GridCacheOperation.DELETE)
+                val = cctx.toCacheObject("value-" + i);
+
+            entries.add(new DataEntry(cctx.cacheId(), key, val, op, null, cctx.versions().next(), 0L,
+                cctx.affinity().partition(i), i));
+        }
+
+        UUID cpId = UUID.randomUUID();
+
+        WALPointer start = wal.log(new CheckpointRecord(cpId, null, false));
+
+        wal.fsync(start);
+
+        for (DataEntry entry : entries)
+            wal.log(new DataRecord(entry));
+
+        WALPointer end = wal.log(new CheckpointRecord(cpId, start, true));
+
+        wal.fsync(end);
+
+        // Data will not be written to the page store.
+        stopAllGrids();
+
+        ig = startGrid(0);
+
+        sharedCtx = ig.context().cache().context();
+        cctx = sharedCtx.cache().cache(cacheName).context();
+
+        db = (GridCacheDatabaseSharedManager)sharedCtx.database();
+        wal = sharedCtx.wal();
+
+        db.enableCheckpoints(false);
+
+        try (WALIterator it = wal.replay(start)) {
+            IgniteBiTuple<WALPointer, WALRecord> tup = it.nextX();
+
+            assert tup.get2() instanceof CheckpointRecord;
+
+            assertEquals(start, tup.get1());
+
+            CheckpointRecord cpRec = (CheckpointRecord)tup.get2();
+
+            assertEquals(cpId, cpRec.checkpointId());
+            assertNull(cpRec.checkpointMark());
+            assertFalse(cpRec.end());
+
+            int idx = 0;
+            CacheObjectContext coctx = cctx.cacheObjectContext();
+
+            while (idx < entries.size()) {
+                tup = it.nextX();
+
+                assert tup.get2() instanceof DataRecord;
+
+                DataRecord dataRec = (DataRecord)tup.get2();
+
+                DataEntry entry = entries.get(idx);
+
+                assertEquals(1, dataRec.writeEntries().size());
+
+                DataEntry readEntry = dataRec.writeEntries().get(0);
+
+                assertEquals(entry.cacheId(), readEntry.cacheId());
+                assertEquals(entry.key().<Integer>value(coctx, true), readEntry.key().<Integer>value(coctx, true));
+                assertEquals(entry.op(), readEntry.op());
+
+                if (entry.op() == GridCacheOperation.UPDATE)
+                    assertEquals(entry.value().value(coctx, true), readEntry.value().value(coctx, true));
+                else
+                    assertNull(entry.value());
+
+                assertEquals(entry.writeVersion(), readEntry.writeVersion());
+                assertEquals(entry.nearXidVersion(), readEntry.nearXidVersion());
+                assertEquals(entry.partitionCounter(), readEntry.partitionCounter());
+
+                idx++;
+            }
+
+            tup = it.nextX();
+
+            assert tup.get2() instanceof CheckpointRecord;
+
+            assertEquals(end, tup.get1());
+
+            cpRec = (CheckpointRecord)tup.get2();
+
+            assertEquals(cpId, cpRec.checkpointId());
+            assertEquals(start, cpRec.checkpointMark());
+            assertTrue(cpRec.end());
+        }
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testPageWalEntries() throws Exception {
+        IgniteEx ig = startGrid(0);
+
+        GridCacheSharedContext<Object, Object> sharedCtx = ig.context().cache().context();
+        int cacheId = sharedCtx.cache().cache(cacheName).context().cacheId();
+
+        GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)sharedCtx.database();
+        PageMemory pageMem = sharedCtx.database().memoryPolicy(null).pageMemory();
+        IgniteWriteAheadLogManager wal = sharedCtx.wal();
+
+        db.enableCheckpoints(false).get();
+
+        int pageCnt = 100;
+
+        List<FullPageId> pageIds = new ArrayList<>();
+
+        for (int i = 0; i < pageCnt; i++) {
+            db.checkpointReadLock();
+            try {
+                pageIds.add(new FullPageId(pageMem.allocatePage(cacheId, PageIdAllocator.INDEX_PARTITION,
+                    PageIdAllocator.FLAG_IDX), cacheId));
+            }
+            finally {
+                db.checkpointReadUnlock();
+            }
+        }
+
+        UUID cpId = UUID.randomUUID();
+
+        WALPointer start = wal.log(new CheckpointRecord(cpId, null, false));
+
+        wal.fsync(start);
+
+        ig.context().cache().context().database().checkpointReadLock();
+
+        try {
+            for (FullPageId pageId : pageIds)
+                writePageData(pageId, pageMem);
+        }
+        finally {
+            ig.context().cache().context().database().checkpointReadUnlock();
+        }
+
+        WALPointer end = wal.log(new CheckpointRecord(cpId, start, true));
+
+        wal.fsync(end);
+
+        // Data will not be written to the page store.
+        stopAllGrids();
+
+        ig = startGrid(0);
+
+        sharedCtx = ig.context().cache().context();
+
+        db = (GridCacheDatabaseSharedManager)sharedCtx.database();
+        wal = sharedCtx.wal();
+
+        db.enableCheckpoints(false);
+
+        try (WALIterator it = wal.replay(start)) {
+            IgniteBiTuple<WALPointer, WALRecord> tup = it.nextX();
+
+            assert tup.get2() instanceof CheckpointRecord : tup.get2();
+
+            assertEquals(start, tup.get1());
+
+            CheckpointRecord cpRec = (CheckpointRecord)tup.get2();
+
+            assertEquals(cpId, cpRec.checkpointId());
+            assertNull(cpRec.checkpointMark());
+            assertFalse(cpRec.end());
+
+            int idx = 0;
+
+            while (idx < pageIds.size()) {
+                tup = it.nextX();
+
+                assert tup.get2() instanceof PageSnapshot : tup.get2().getClass();
+
+                PageSnapshot snap = (PageSnapshot)tup.get2();
+
+                //there are extra tracking pages, skip them
+                if (TrackingPageIO.VERSIONS.latest().trackingPageFor(snap.fullPageId().pageId(), pageMem.pageSize()) == snap.fullPageId().pageId()) {
+                    tup = it.nextX();
+
+                    assertTrue(tup.get2() instanceof PageSnapshot);
+
+                    snap = (PageSnapshot)tup.get2();
+                }
+
+                assertEquals(pageIds.get(idx), snap.fullPageId());
+
+                idx++;
+            }
+
+            tup = it.nextX();
+
+            assert tup.get2() instanceof CheckpointRecord;
+
+            assertEquals(end, tup.get1());
+
+            cpRec = (CheckpointRecord)tup.get2();
+
+            assertEquals(cpId, cpRec.checkpointId());
+            assertEquals(start, cpRec.checkpointMark());
+            assertTrue(cpRec.end());
+        }
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testDirtyFlag() throws Exception {
+        IgniteEx ig = startGrid(0);
+
+        GridCacheSharedContext<Object, Object> shared = ig.context().cache().context();
+
+        int cacheId = shared.cache().cache(cacheName).context().cacheId();
+
+        GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)shared.database();
+
+        // Disable integrated checkpoint thread.
+        dbMgr.enableCheckpoints(false);
+
+        PageMemoryEx mem = (PageMemoryEx) dbMgr.memoryPolicy(null).pageMemory();
+
+        ig.context().cache().context().database().checkpointReadLock();
+
+        FullPageId[] pageIds = new FullPageId[100];
+
+        try {
+            for (int i = 0; i < pageIds.length; i++)
+                pageIds[i] = new FullPageId(mem.allocatePage(cacheId, 0, PageIdAllocator.FLAG_DATA), cacheId);
+
+            for (FullPageId fullId : pageIds) {
+                long page = mem.acquirePage(fullId.cacheId(), fullId.pageId());
+
+                try {
+                    assertTrue(mem.isDirty(fullId.cacheId(), fullId.pageId(), page)); //page is dirty right after allocation
+
+                    long pageAddr = mem.writeLock(fullId.cacheId(), fullId.pageId(), page);
+
+                    PageIO.setPageId(pageAddr, fullId.pageId());
+
+                    try {
+                        assertTrue(mem.isDirty(fullId.cacheId(), fullId.pageId(), page));
+                    }
+                    finally {
+                        mem.writeUnlock(fullId.cacheId(), fullId.pageId(),page, null,true);
+                    }
+
+                    assertTrue(mem.isDirty(fullId.cacheId(), fullId.pageId(), page));
+                }
+                finally {
+                    mem.releasePage(fullId.cacheId(), fullId.pageId(), page);
+                }
+            }
+        }
+        finally {
+            ig.context().cache().context().database().checkpointReadUnlock();
+        }
+
+        Collection<FullPageId> cpPages = mem.beginCheckpoint();
+
+        ig.context().cache().context().database().checkpointReadLock();
+
+        try {
+            for (FullPageId fullId : pageIds) {
+                assertTrue(cpPages.contains(fullId));
+
+                ByteBuffer buf = ByteBuffer.allocate(mem.pageSize());
+
+                long page = mem.acquirePage(fullId.cacheId(), fullId.pageId());
+
+                try {
+                    assertTrue(mem.isDirty(fullId.cacheId(), fullId.pageId(), page));
+
+                    long pageAddr = mem.writeLock(fullId.cacheId(), fullId.pageId(), page);
+
+                    try {
+                        assertFalse(mem.isDirty(fullId.cacheId(), fullId.pageId(), page));
+
+                        for (int i = PageIO.COMMON_HEADER_END; i < mem.pageSize(); i++)
+                            PageUtils.putByte(pageAddr, i, (byte)1);
+                    }
+                    finally {
+                        mem.writeUnlock(fullId.cacheId(), fullId.pageId(), page, null, true);
+                    }
+
+                    assertTrue(mem.isDirty(fullId.cacheId(), fullId.pageId(), page));
+
+                    buf.rewind();
+
+                    mem.getForCheckpoint(fullId, buf, null);
+
+                    buf.position(PageIO.COMMON_HEADER_END);
+
+                    while (buf.hasRemaining())
+                        assertEquals((byte)0, buf.get());
+                }
+                finally {
+                    mem.releasePage(fullId.cacheId(), fullId.pageId(), page);
+                }
+            }
+        }
+        finally {
+            ig.context().cache().context().database().checkpointReadUnlock();
+        }
+
+        mem.finishCheckpoint();
+
+        for (FullPageId fullId : pageIds) {
+            long page = mem.acquirePage(fullId.cacheId(), fullId.pageId());
+            try {
+                assertTrue(mem.isDirty(fullId.cacheId(), fullId.pageId(), page));
+            }
+            finally {
+                mem.releasePage(fullId.cacheId(), fullId.pageId(), page);
+            }
+        }
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    private void writePageData(FullPageId fullId, PageMemory mem) throws Exception {
+        long page = mem.acquirePage(fullId.cacheId(), fullId.pageId());
+        try {
+            long pageAddr = mem.writeLock(fullId.cacheId(), fullId.pageId(), page);
+
+            try {
+                DataPageIO.VERSIONS.latest().initNewPage(pageAddr, fullId.pageId(), mem.pageSize());
+
+                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                for (int i = PageIO.COMMON_HEADER_END; i < mem.pageSize(); i++)
+                    PageUtils.putByte(pageAddr, i, (byte)rnd.nextInt(255));
+            }
+            finally {
+                mem.writeUnlock(fullId.cacheId(), fullId.pageId(), page, null, true);
+            }
+        }
+        finally {
+            mem.releasePage(fullId.cacheId(), fullId.pageId(), page);
+        }
+    }
+
+    /**
+     * @param res Result map to verify.
+     * @param mem Memory.
+     */
+    private void verifyReads(
+        Map<FullPageId, Integer> res,
+        PageMemory mem,
+        WALPointer start,
+        IgniteWriteAheadLogManager wal
+    ) throws Exception {
+        Map<FullPageId, byte[]> replay = new HashMap<>();
+
+        try (WALIterator it = wal.replay(start)) {
+            IgniteBiTuple<WALPointer, WALRecord> tup = it.nextX();
+
+            assertTrue("Invalid record: " + tup, tup.get2() instanceof CheckpointRecord);
+
+            CheckpointRecord cpRec = (CheckpointRecord)tup.get2();
+
+            while (it.hasNextX()) {
+                tup = it.nextX();
+
+                WALRecord rec = tup.get2();
+
+                if (rec instanceof CheckpointRecord) {
+                    CheckpointRecord end = (CheckpointRecord)rec;
+
+                    // Found the finish mark.
+                    if (end.checkpointId().equals(cpRec.checkpointId()) && end.end())
+                        break;
+                }
+                else if (rec instanceof PageSnapshot) {
+                    PageSnapshot page = (PageSnapshot)rec;
+
+                    replay.put(page.fullPageId(), page.pageData());
+                }
+            }
+        }
+
+        // Check read-through from the file store.
+        for (Map.Entry<FullPageId, Integer> entry : res.entrySet()) {
+            FullPageId fullId = entry.getKey();
+            int state = entry.getValue();
+
+            if (state == -1) {
+                info("Page was never written: " + fullId);
+
+                continue;
+            }
+
+            byte[] walData = replay.get(fullId);
+
+            assertNotNull("Missing WAL record for a written page: " + fullId, walData);
+
+            long page = mem.acquirePage(fullId.cacheId(), fullId.pageId());
+            try {
+                long pageAddr = mem.readLock(fullId.cacheId(), fullId.pageId(), page);
+
+                try {
+                    for (int i = PageIO.COMMON_HEADER_END; i < mem.pageSize(); i++) {
+                        assertEquals("Invalid state [pageId=" + fullId + ", pos=" + i + ']',
+                            state & 0xFF, PageUtils.getByte(pageAddr, i) & 0xFF);
+
+                        assertEquals("Invalid WAL state [pageId=" + fullId + ", pos=" + i + ']',
+                            state & 0xFF, walData[i] & 0xFF);
+                    }
+                }
+                finally {
+                    mem.readUnlock(fullId.cacheId(), fullId.pageId(), page);
+                }
+            }
+            finally {
+                mem.releasePage(fullId.cacheId(), fullId.pageId(), page);
+            }
+        }
+    }
+
+    /**
+     * @param mem Memory to use.
+     * @param storeMgr Store manager.
+     * @param cacheId Cache ID.
+     * @return Result map of random operations.
+     * @throws Exception If failure occurred.
+     */
+    private IgniteBiTuple<Map<FullPageId, Integer>, WALPointer> runCheckpointing(
+        final IgniteEx ig,
+        final PageMemoryImpl mem,
+        final IgnitePageStoreManager storeMgr,
+        final IgniteWriteAheadLogManager wal,
+        final int cacheId
+    ) throws Exception {
+        final ConcurrentMap<FullPageId, Integer> resMap = new ConcurrentHashMap<>();
+
+        final FullPageId pages[] = new FullPageId[TOTAL_PAGES];
+
+        Set<FullPageId> allocated = new HashSet<>();
+
+        for (int i = 0; i < TOTAL_PAGES; i++) {
+            FullPageId fullId = new FullPageId(mem.allocatePage(cacheId, 0, PageIdAllocator.FLAG_DATA), cacheId);
+
+            resMap.put(fullId, -1);
+
+            pages[i] = fullId;
+
+            allocated.add(fullId);
+        }
+
+        final AtomicBoolean run = new AtomicBoolean(true);
+
+        // Simulate transaction lock.
+        final ReadWriteLock updLock = new ReentrantReadWriteLock();
+
+        // Mark the start position.
+        CheckpointRecord cpRec = new CheckpointRecord(null, false);
+
+        WALPointer start = wal.log(cpRec);
+
+        wal.fsync(start);
+
+        IgniteInternalFuture<Long> updFut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                while (true) {
+                    FullPageId fullId = pages[ThreadLocalRandom.current().nextInt(TOTAL_PAGES)];
+
+                    updLock.readLock().lock();
+
+                    try {
+                        if (!run.get())
+                            return null;
+
+                        ig.context().cache().context().database().checkpointReadLock();
+
+                        try {
+                            long page = mem.acquirePage(fullId.cacheId(), fullId.pageId());
+
+                            try {
+                                long pageAddr = mem.writeLock(fullId.cacheId(), fullId.pageId(), page);
+
+                                PageIO.setPageId(pageAddr, fullId.pageId());
+
+                                try {
+                                    int state = resMap.get(fullId);
+
+                                    if (state != -1) {
+                                        if (VERBOSE)
+                                            info("Verify page [fullId=" + fullId + ", state=" + state +
+                                                ", buf=" + pageAddr +
+                                                ", bhc=" + U.hexLong(System.identityHashCode(pageAddr)) +
+                                                ", page=" + U.hexLong(System.identityHashCode(page)) + ']');
+
+                                        for (int i = PageIO.COMMON_HEADER_END; i < mem.pageSize(); i++)
+                                            assertEquals("Verify page failed [fullId=" + fullId +
+                                                    ", i=" + i +
+                                                    ", state=" + state +
+                                                    ", buf=" + pageAddr +
+                                                    ", bhc=" + U.hexLong(System.identityHashCode(pageAddr)) +
+                                                    ", page=" + U.hexLong(System.identityHashCode(page)) + ']',
+                                                state & 0xFF, PageUtils.getByte(pageAddr, i) & 0xFF);
+                                    }
+
+                                    state = (state + 1) & 0xFF;
+
+                                    if (VERBOSE)
+                                        info("Write page [fullId=" + fullId + ", state=" + state +
+                                            ", buf=" + pageAddr +
+                                            ", bhc=" + U.hexLong(System.identityHashCode(pageAddr)) +
+                                            ", page=" + U.hexLong(System.identityHashCode(page)) + ']');
+
+                                    for (int i = PageIO.COMMON_HEADER_END; i < mem.pageSize(); i++)
+                                        PageUtils.putByte(pageAddr, i, (byte)state);
+
+                                    resMap.put(fullId, state);
+                                }
+                                finally {
+                                    mem.writeUnlock(fullId.cacheId(), fullId.pageId(),page, null,true);
+                                }
+                            }
+                            finally {
+                                mem.releasePage(fullId.cacheId(), fullId.pageId(),page);}
+                            }
+                            finally {
+                                ig.context().cache().context().database().checkpointReadUnlock();
+                            }
+                        }
+                        finally {
+                            updLock.readLock().unlock();
+                        }
+                    }
+                }
+            }, 8, "update-thread");
+
+        int checkpoints = 20;
+
+        while (checkpoints > 0) {
+            Map<FullPageId, Integer> snapshot = null;
+
+            Collection<FullPageId> pageIds;
+
+            updLock.writeLock().lock();
+
+            try {
+                snapshot = new HashMap<>(resMap);
+
+                pageIds = mem.beginCheckpoint();
+
+                checkpoints--;
+
+                if (checkpoints == 0)
+                    // No more writes should be done at this point.
+                    run.set(false);
+
+                info("Acquired pages for checkpoint: " + pageIds.size());
+            }
+            finally {
+                updLock.writeLock().unlock();
+            }
+
+            boolean ok = false;
+
+            try {
+                ByteBuffer tmpBuf = ByteBuffer.allocate(mem.pageSize());
+
+                tmpBuf.order(ByteOrder.nativeOrder());
+
+                long begin = System.currentTimeMillis();
+
+                long cp = 0;
+
+                long write = 0;
+
+                for (FullPageId fullId : pageIds) {
+                    long cpStart = System.nanoTime();
+
+                    Integer tag = mem.getForCheckpoint(fullId, tmpBuf, null);
+
+                    if (tag == null)
+                        continue;
+
+                    long cpEnd = System.nanoTime();
+
+                    cp += cpEnd - cpStart;
+
+                    Integer state = snapshot.get(fullId);
+
+                    if (allocated.contains(fullId) && state != -1) {
+                        tmpBuf.rewind();
+
+                        Integer first = null;
+
+                        for (int i = PageIO.COMMON_HEADER_END; i < mem.pageSize(); i++) {
+                            int val = tmpBuf.get(i) & 0xFF;
+
+                            if (first == null)
+                                first = val;
+
+                            // Avoid string concat.
+                            if (first != val)
+                                assertEquals("Corrupted buffer at position [pageId=" + fullId + ", pos=" + i + ']',
+                                    (int)first, val);
+
+                            // Avoid string concat.
+                            if (state != val)
+                                assertEquals("Invalid value at position [pageId=" + fullId + ", pos=" + i + ']',
+                                    (int)state, val);
+                        }
+                    }
+
+                    tmpBuf.rewind();
+
+                    long writeStart = System.nanoTime();
+
+                    storeMgr.write(cacheId, fullId.pageId(), tmpBuf, tag);
+
+                    long writeEnd = System.nanoTime();
+
+                    write += writeEnd - writeStart;
+
+                    tmpBuf.rewind();
+                }
+
+                long syncStart = System.currentTimeMillis();
+
+                storeMgr.sync(cacheId, 0);
+
+                long end = System.currentTimeMillis();
+
+                info("Written pages in " + (end - begin) + "ms, copy took " + (cp / 1_000_000) + "ms, " +
+                    "write took " + (write / 1_000_000) + "ms, sync took " + (end - syncStart) + "ms");
+
+                ok = true;
+            }
+            finally {
+                info("Finishing checkpoint...");
+
+                mem.finishCheckpoint();
+
+                info("Finished checkpoint");
+
+                if (!ok) {
+                    info("Cancelling updates...");
+
+                    run.set(false);
+
+                    updFut.get();
+                }
+            }
+
+            if (checkpoints != 0)
+                Thread.sleep(2_000);
+        }
+
+        info("checkpoints=" + checkpoints + ", done=" + updFut.isDone());
+
+        updFut.get();
+
+        // Mark the end.
+        wal.fsync(wal.log(new CheckpointRecord(cpRec.checkpointId(), start, true)));
+
+        assertEquals(0, mem.activePagesCount());
+
+        for (FullPageId fullId : pages) {
+
+            long page = mem.acquirePage(fullId.cacheId(), fullId.pageId());
+
+            try {
+                assertFalse("Page has a temp heap copy after the last checkpoint: [cacheId=" +
+                    fullId.cacheId() + ", pageId=" + fullId.pageId() + "]", mem.hasTempCopy(page));
+
+                assertFalse("Page is dirty after the last checkpoint: [cacheId=" +
+                    fullId.cacheId() + ", pageId=" + fullId.pageId() + "]", mem.isDirty(fullId.cacheId(), fullId.pageId(), page));
+            }
+            finally {
+                mem.releasePage(fullId.cacheId(), fullId.pageId(), page);
+            }
+        }
+
+        return F.t((Map<FullPageId, Integer>)resMap, start);
+    }
+
+    /**
+     *
+     */
+    private void deleteWorkFiles() throws IgniteCheckedException {
+        deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6bf5ce46/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgnitePdsWalTlbTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgnitePdsWalTlbTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgnitePdsWalTlbTest.java
new file mode 100644
index 0000000..6ca0a1d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgnitePdsWalTlbTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db.wal;
+
+import javax.cache.CacheException;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.configuration.MemoryPolicyConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.configuration.PersistentStoreConfiguration.DFLT_CHECKPOINTING_PAGE_BUFFER_SIZE;
+
+/**
+ *
+ */
+public class IgnitePdsWalTlbTest extends GridCommonAbstractTest {
+    /** Ip finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Cache name. */
+    private static final String CACHE_NAME = "cache";
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(CACHE_NAME);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        MemoryConfiguration memCfg = new MemoryConfiguration();
+
+        MemoryPolicyConfiguration memPlcCfg = new MemoryPolicyConfiguration();
+
+        memPlcCfg.setName("dfltMemPlc");
+        memPlcCfg.setInitialSize(100 * 1024 * 1024);
+        memPlcCfg.setMaxSize(100 * 1024 * 1024);
+
+        memCfg.setMemoryPolicies(memPlcCfg);
+        memCfg.setDefaultMemoryPolicyName("dfltMemPlc");
+
+        cfg.setMemoryConfiguration(memCfg);
+
+        cfg.setPersistentStoreConfiguration(
+            new PersistentStoreConfiguration()
+                .setCheckpointingPageBufferSize(DFLT_CHECKPOINTING_PAGE_BUFFER_SIZE + 1)
+                .setTlbSize(640000000)
+        );
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(IP_FINDER);
+
+        if (gridName.endsWith("1"))
+            cfg.setClientMode(true);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 30_000;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false));
+
+        stopAllGrids();
+
+        startGrids(2);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false));
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testWalDirectOutOfMemory() throws Exception {
+        IgniteEx ig = grid(1);
+
+        boolean locked = true;
+
+        try {
+            IgniteDataStreamer<Integer, Integer> streamer = ig.dataStreamer(CACHE_NAME);
+
+            for (int i = 0; i < 100_000; i++) {
+                streamer.addData(i, 1);
+
+                if (i > 0 && i % 10_000 == 0)
+                    info("Done put: " + i);
+            }
+        }
+        catch (CacheException ignore) {
+            // expected
+            locked = false;
+        }
+        finally {
+            assertFalse(locked);
+
+            stopAllGrids();
+        }
+    }
+}


Mime
View raw message