ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [39/47] ignite git commit: IGNITE-5267 - Moved ignite-ps module to ignite-core
Date Sun, 11 Jun 2017 20:04:05 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/6bf5ce46/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
new file mode 100755
index 0000000..2146f98
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -0,0 +1,3113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.channels.OverlappingFileLockException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.management.InstanceNotFoundException;
+import javax.management.JMException;
+import javax.management.MBeanRegistrationException;
+import javax.management.ObjectName;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.PersistenceMetrics;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataPageEvictionMode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.configuration.MemoryPolicyConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.mem.DirectMemoryProvider;
+import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.pagemem.PageUtils;
+import org.apache.ignite.internal.pagemem.snapshot.SnapshotOperation;
+import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
+import org.apache.ignite.internal.pagemem.store.PageStore;
+import org.apache.ignite.internal.pagemem.wal.StorageException;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.CacheState;
+import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
+import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
+import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.MemoryRecoveryRecord;
+import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.PageDeltaRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionDestroyRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.ClusterState;
+import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.StoredCacheData;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.pagemem.CheckpointMetricsTracker;
+import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
+import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.port.GridPortRecord;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.GridMultiCollectionWrapper;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.future.CountDownFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.GridInClosure3X;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.P3;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.LT;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.SB;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.mxbean.PersistenceMetricsMXBean;
+import org.apache.ignite.thread.IgniteThread;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
+
+/**
+ *
+ */
+@SuppressWarnings({"unchecked", "NonPrivateFieldAccessedInSynchronizedContext"})
+public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedManager {
+    /** */
+    public static final String IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC = "IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC";
+
+    /** Skip sync. */
+    private final boolean skipSync = IgniteSystemProperties.getBoolean(IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC);
+
+    /** */
+    private boolean skipCrc = IgniteSystemProperties.getBoolean(IGNITE_PDS_SKIP_CRC, false);
+
+    /** */
+    private final int walRebalanceThreshold = IgniteSystemProperties.getInteger(
+        IGNITE_PDS_WAL_REBALANCE_THRESHOLD, 500_000);
+
+    /** Checkpoint lock hold count. */
+    private static final ThreadLocal<Integer> CHECKPOINT_LOCK_HOLD_COUNT = new ThreadLocal<Integer>() {
+        @Override protected Integer initialValue() {
+            return 0;
+        }
+    };
+
+    /** Assertion enabled. */
+    private static final boolean ASSERTION_ENABLED = GridCacheDatabaseSharedManager.class.desiredAssertionStatus();
+
+    /** Checkpoint file name pattern. */
+    private static final Pattern CP_FILE_NAME_PATTERN = Pattern.compile("(\\d+)-(.*)-(START|END)\\.bin");
+
+    /** */
+    private static final FileFilter CP_FILE_FILTER = new FileFilter() {
+        @Override public boolean accept(File f) {
+            return CP_FILE_NAME_PATTERN.matcher(f.getName()).matches();
+        }
+    };
+
+    /** */
+    private static final Comparator<GridDhtLocalPartition> ASC_PART_COMPARATOR = new Comparator<GridDhtLocalPartition>() {
+        @Override public int compare(GridDhtLocalPartition a, GridDhtLocalPartition b) {
+            return Integer.compare(a.id(), b.id());
+        }
+    };
+
+    /** */
+    private static final Comparator<File> CP_TS_COMPARATOR = new Comparator<File>() {
+        /** {@inheritDoc} */
+        @Override public int compare(File o1, File o2) {
+            Matcher m1 = CP_FILE_NAME_PATTERN.matcher(o1.getName());
+            Matcher m2 = CP_FILE_NAME_PATTERN.matcher(o2.getName());
+
+            boolean s1 = m1.matches();
+            boolean s2 = m2.matches();
+
+            assert s1 : "Failed to match CP file: " + o1.getAbsolutePath();
+            assert s2 : "Failed to match CP file: " + o2.getAbsolutePath();
+
+            long ts1 = Long.parseLong(m1.group(1));
+            long ts2 = Long.parseLong(m2.group(1));
+
+            int res = Long.compare(ts1, ts2);
+
+            if (res == 0) {
+                CheckpointEntryType type1 = CheckpointEntryType.valueOf(m1.group(3));
+                CheckpointEntryType type2 = CheckpointEntryType.valueOf(m2.group(3));
+
+                assert type1 != type2 : "o1=" + o1.getAbsolutePath() + ", o2=" + o2.getAbsolutePath();
+
+                res = type1 == CheckpointEntryType.START ? -1 : 1;
+            }
+
+            return res;
+        }
+    };
+
+    /** Checkpoint thread. Needs to be volatile because it is created in exchange worker. */
+    private volatile Checkpointer checkpointer;
+
+    /** For testing only. */
+    private volatile boolean checkpointsEnabled = true;
+
+    /** For testing only. */
+    private volatile GridFutureAdapter<Void> enableChangeApplied;
+
+    /** */
+    private ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock();
+
+    /** */
+    private long checkpointFreq;
+
+    /** */
+    private long checkpointPageBufSize;
+
+    /** */
+    private FilePageStoreManager storeMgr;
+
+    /** */
+    private File cpDir;
+
+    /** */
+    private volatile boolean printCheckpointStats = true;
+
+    /** Database configuration. */
+    private final PersistentStoreConfiguration persistenceCfg;
+
+    /** */
+    private final Collection<DbCheckpointListener> lsnrs = new CopyOnWriteArrayList<>();
+
+    /** Checkpoint history. */
+    private final CheckpointHistory checkpointHist = new CheckpointHistory();
+
+    /** */
+    private boolean stopping;
+
+    /** Checkpoint runner thread pool. */
+    private ExecutorService asyncRunner;
+
+    /** Buffer for the checkpoint threads. */
+    private ThreadLocal<ByteBuffer> threadBuf;
+
+    /** */
+    private final ConcurrentMap<Integer, IgniteInternalFuture> idxRebuildFuts = new ConcurrentHashMap<>();
+
+    /** Lock holder. */
+    private FileLockHolder fileLockHolder;
+
+    /** Lock wait time. */
+    private final int lockWaitTime;
+
+    /** */
+    private Map<Integer, Map<Integer, T2<Long, WALPointer>>> reservedForExchange;
+
+    /** */
+    private final ConcurrentMap<T2<Integer, Integer>, T2<Long, WALPointer>> reservedForPreloading = new ConcurrentHashMap<>();
+
+    /** Snapshot manager. */
+    private IgniteCacheSnapshotManager snapshotMgr;
+
+    /** */
+    private PersistenceMetricsImpl persStoreMetrics;
+
+    /** */
+    private ObjectName persistenceMetricsMbeanName;
+
+    /**
+     * @param ctx Kernal context.
+     */
+    public GridCacheDatabaseSharedManager(GridKernalContext ctx) {
+        IgniteConfiguration cfg = ctx.config();
+
+        persistenceCfg = cfg.getPersistentStoreConfiguration();
+
+        assert persistenceCfg != null : "PageStore should not be created if persistence is disabled.";
+
+        checkpointFreq = persistenceCfg.getCheckpointingFrequency();
+
+        lockWaitTime = persistenceCfg.getLockWaitTime();
+
+        final int pageSize = cfg.getMemoryConfiguration().getPageSize();
+
+        threadBuf = new ThreadLocal<ByteBuffer>() {
+            /** {@inheritDoc} */
+            @Override protected ByteBuffer initialValue() {
+                ByteBuffer tmpWriteBuf = ByteBuffer.allocateDirect(pageSize);
+
+                tmpWriteBuf.order(ByteOrder.nativeOrder());
+
+                return tmpWriteBuf;
+            }
+        };
+
+        persStoreMetrics = new PersistenceMetricsImpl(
+            persistenceCfg.isMetricsEnabled(),
+            persistenceCfg.getRateTimeInterval(),
+            persistenceCfg.getSubIntervals()
+        );
+    }
+
+    /**
+     *
+     */
+    public Checkpointer getCheckpointer() {
+        return checkpointer;
+    }
+
+    /**
+     * For test use only.
+     */
+    public IgniteInternalFuture<Void> enableCheckpoints(boolean enable) {
+        GridFutureAdapter<Void> fut = new GridFutureAdapter<>();
+
+        enableChangeApplied = fut;
+
+        checkpointsEnabled = enable;
+
+        wakeupForCheckpoint("enableCheckpoints()");
+
+        return fut;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void start0() throws IgniteCheckedException {
+        snapshotMgr = cctx.snapshot();
+
+        assert !cctx.kernalContext().state().active() : "Cluster with persistent must starting as inactive.";
+
+        if (!cctx.kernalContext().clientNode()) {
+            IgnitePageStoreManager store = cctx.pageStore();
+
+            assert store instanceof FilePageStoreManager : "Invalid page store manager was created: " + store;
+
+            storeMgr = (FilePageStoreManager)store;
+
+            cpDir = Paths.get(storeMgr.workDir().getAbsolutePath(), "cp").toFile();
+
+            if (!U.mkdirs(cpDir))
+                throw new IgniteCheckedException("Could not create directory for checkpoint metadata: " + cpDir);
+
+            fileLockHolder = new FileLockHolder(storeMgr.workDir().getPath(), cctx.kernalContext(), log);
+
+            persStoreMetrics.wal(cctx.wal());
+
+            try {
+                persistenceMetricsMbeanName = U.registerMBean(
+                    cctx.kernalContext().config().getMBeanServer(),
+                    cctx.kernalContext().igniteInstanceName(),
+                    "Persistent Store",
+                    "PersistenceMetrics",
+                    persStoreMetrics,
+                    PersistenceMetricsMXBean.class);
+            }
+            catch (JMException e) {
+                throw new IgniteCheckedException("Failed to register persistence metrics MBean", e);
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    @Override public void initDataBase() throws IgniteCheckedException {
+        Long cpBufSize = persistenceCfg.getCheckpointingPageBufferSize();
+
+        if (persistenceCfg.getCheckpointingThreads() > 1)
+            asyncRunner = new ThreadPoolExecutor(
+                persistenceCfg.getCheckpointingThreads(),
+                persistenceCfg.getCheckpointingThreads(),
+                30L,
+                TimeUnit.SECONDS,
+                new LinkedBlockingQueue<Runnable>()
+            );
+
+        // Intentionally use identity comparison to check if configuration default has changed.
+        //noinspection NumberEquality
+        if (cpBufSize == PersistentStoreConfiguration.DFLT_CHECKPOINTING_PAGE_BUFFER_SIZE) {
+            MemoryConfiguration memCfg = cctx.kernalContext().config().getMemoryConfiguration();
+
+            assert memCfg != null;
+
+            long totalSize = memCfg.getSystemCacheMaxSize();
+
+            if (memCfg.getMemoryPolicies() == null)
+                totalSize += MemoryConfiguration.DFLT_MEMORY_POLICY_MAX_SIZE;
+            else {
+                for (MemoryPolicyConfiguration memPlc : memCfg.getMemoryPolicies()) {
+                    if (Long.MAX_VALUE - memPlc.getMaxSize() > totalSize)
+                        totalSize += memPlc.getMaxSize();
+                    else {
+                        totalSize = Long.MAX_VALUE;
+
+                        break;
+                    }
+                }
+
+                assert totalSize > 0;
+            }
+
+            // Limit the checkpoint page buffer size by 2GB.
+            long dfltSize = 2 * 1024L * 1024L * 1024L;
+
+            long adjusted = Math.min(totalSize / 4, dfltSize);
+
+            if (cpBufSize < adjusted) {
+                U.quietAndInfo(log,
+                    "Default checkpoint page buffer size is too small, setting to an adjusted value: "
+                        + U.readableSize(adjusted, false)
+                );
+
+                cpBufSize = adjusted;
+            }
+        }
+
+        checkpointPageBufSize = cpBufSize;
+
+        super.start0();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void initPageMemoryDataStructures(MemoryConfiguration dbCfg) throws IgniteCheckedException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void onKernalStart0(boolean reconnect) throws IgniteCheckedException {
+        if (reconnect || cctx.kernalContext().clientNode() || !cctx.kernalContext().state().active())
+            return;
+
+        if (persistenceEnabled() && cctx.kernalContext().state().active())
+            initDataBase();
+
+        GridCacheProcessor cachePrc = cctx.kernalContext().cache();
+
+        // Todo before join local node.
+
+        Collection<String> cacheNames = new HashSet<>();
+
+        // TODO IGNITE-5075 group descriptors.
+        for (CacheConfiguration ccfg : cctx.kernalContext().config().getCacheConfiguration()) {
+            if (CU.isSystemCache(ccfg.getName())) {
+                storeMgr.initializeForCache(
+                    cctx.cache().cacheDescriptors().get(ccfg.getName()).groupDescriptor(), new StoredCacheData(ccfg));
+
+                cacheNames.add(ccfg.getName());
+            }
+        }
+
+        for (CacheConfiguration ccfg : cctx.kernalContext().config().getCacheConfiguration())
+            if (!CU.isSystemCache(ccfg.getName())) {
+                DynamicCacheDescriptor cacheDesc = cctx.cache().cacheDescriptors().get(ccfg.getName());
+
+                if (cacheDesc != null)
+                    storeMgr.initializeForCache(cacheDesc.groupDescriptor(), new StoredCacheData(ccfg));
+
+                cacheNames.add(ccfg.getName());
+            }
+
+        for (StoredCacheData cacheData : cctx.pageStore().readCacheConfigurations().values()) {
+            if (!cacheNames.contains(cacheData.config().getName()))
+                storeMgr.initializeForCache(
+                    cctx.cache().cacheDescriptors().get(cacheData.config().getName()).groupDescriptor(), cacheData);
+        }
+
+        readCheckpointAndRestoreMemory();
+    }
+
+
+    /** {@inheritDoc} */
+    @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException {
+        snapshotMgr = cctx.snapshot();
+
+        if (cctx.localNode().isClient())
+            return;
+
+        initDataBase();
+
+        if (log.isDebugEnabled())
+            log.debug("Activate database manager [id=" + cctx.localNodeId() +
+                " topVer=" + cctx.discovery().topologyVersionEx() + " ]");
+
+        GridCacheProcessor cachePrc = cctx.kernalContext().cache();
+
+        // Todo join local info.
+
+        Collection<String> cacheNames = new HashSet<>();
+
+        // TODO IGNITE-5075 group descriptors.
+        for (CacheConfiguration ccfg : cctx.kernalContext().config().getCacheConfiguration()) {
+            if (CU.isSystemCache(ccfg.getName())) {
+                storeMgr.initializeForCache(
+                    cctx.cache().cacheDescriptors().get(ccfg.getName()).groupDescriptor(), new StoredCacheData(ccfg));
+
+                cacheNames.add(ccfg.getName());
+            }
+        }
+
+        for (CacheConfiguration ccfg : cctx.kernalContext().config().getCacheConfiguration())
+            if (!CU.isSystemCache(ccfg.getName())) {
+                DynamicCacheDescriptor cacheDesc = cctx.cache().cacheDescriptors().get(ccfg.getName());
+
+                if (cacheDesc != null)
+                    storeMgr.initializeForCache(cacheDesc.groupDescriptor(), new StoredCacheData(ccfg));
+
+                cacheNames.add(ccfg.getName());
+            }
+
+        for (StoredCacheData cacheData : cctx.pageStore().readCacheConfigurations().values()) {
+            if (!cacheNames.contains(cacheData.config().getName()))
+                storeMgr.initializeForCache(
+                    cctx.cache().cacheDescriptors().get(cacheData.config().getName()).groupDescriptor(), cacheData);
+        }
+
+        readCheckpointAndRestoreMemory();
+
+        if (log.isDebugEnabled())
+            log.debug("Restore state after activation [nodeId=" + cctx.localNodeId() + " ]");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException {
+        stop0(true);
+
+        if (log.isDebugEnabled())
+            log.debug("DeActivate database manager [id=" + cctx.localNodeId() +
+                " topVer=" + cctx.discovery().topologyVersionEx() + " ]");
+
+        onKernalStop0(false);
+
+        /* Must be here, because after deactivate we can invoke activate and file lock must be already configured */
+        stopping = false;
+
+        fileLockHolder = new FileLockHolder(storeMgr.workDir().getPath(), cctx.kernalContext(), log);
+    }
+
+    /**
+     *
+     */
+    private void readCheckpointAndRestoreMemory() throws IgniteCheckedException {
+        checkpointReadLock();
+
+        try {
+            CheckpointStatus status = readCheckpointStatus();
+
+            // First, bring memory to the last consistent checkpoint state if needed.
+            // This method should return a pointer to the last valid record in the WAL.
+            WALPointer restore = restoreMemory(status);
+
+            cctx.wal().resumeLogging(restore);
+
+            cctx.wal().log(new MemoryRecoveryRecord(U.currentTimeMillis()));
+        }
+        catch (StorageException e) {
+            throw new IgniteCheckedException(e);
+        }
+        finally {
+            checkpointReadUnlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void lock() throws IgniteCheckedException {
+        if (log.isDebugEnabled())
+            log.debug("Try to capture file lock [nodeId=" +
+                cctx.localNodeId() + " path=" + fileLockHolder.lockPath() + "]");
+
+        fileLockHolder.tryLock(lockWaitTime);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void unLock() {
+        if (log.isDebugEnabled())
+            log.debug("Release file lock [nodeId=" +
+                cctx.localNodeId() + " path=" + fileLockHolder.lockPath() + "]");
+
+        fileLockHolder.release();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onCacheStop(GridCacheContext cctx) {
+        snapshotMgr.onCacheStop(cctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void onKernalStop0(boolean cancel) {
+        checkpointLock.writeLock().lock();
+
+        try {
+            stopping = true;
+        }
+        finally {
+            checkpointLock.writeLock().unlock();
+        }
+
+        snapshotMgr.onKernalStop(cancel);
+
+        shutdownCheckpointer(cancel);
+
+        lsnrs.clear();
+
+        super.onKernalStop0(cancel);
+
+        if (!cctx.kernalContext().clientNode()) {
+            unLock();
+
+            fileLockHolder.close();
+        }
+
+        if (persistenceMetricsMbeanName != null) {
+            try {
+                cctx.kernalContext().config().getMBeanServer().unregisterMBean(persistenceMetricsMbeanName);
+            }
+            catch (InstanceNotFoundException ignore) {
+                // No-op, nothing to unregister.
+            }
+            catch (MBeanRegistrationException e) {
+                U.error(log, "Failed to unregister persistence metrics MBean (will continue stop routine)", e);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void stop0(boolean cancel) {
+        super.stop0(cancel);
+
+        snapshotMgr.stop(cancel);
+    }
+
+    /** */
+    private long[] calculateFragmentSizes(int concLvl, long cacheSize) {
+        if (concLvl < 2)
+            concLvl = Runtime.getRuntime().availableProcessors();
+
+        long fragmentSize = cacheSize / concLvl;
+
+        if (fragmentSize < 1024 * 1024)
+            fragmentSize = 1024 * 1024;
+
+        long[] sizes = new long[concLvl + 1];
+
+        for (int i = 0; i < concLvl; i++)
+            sizes[i] = fragmentSize;
+
+        sizes[concLvl] = checkpointPageBufSize;
+
+        return sizes;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected PageMemory createPageMemory(
+        DirectMemoryProvider memProvider,
+        MemoryConfiguration memCfg,
+        MemoryPolicyConfiguration plcCfg,
+        MemoryMetricsImpl memMetrics
+    ) {
+        memMetrics.persistenceEnabled(true);
+
+        PageMemoryImpl pageMem = new PageMemoryImpl(
+            memProvider,
+            calculateFragmentSizes(
+                memCfg.getConcurrencyLevel(),
+                plcCfg.getMaxSize()
+            ),
+            cctx,
+            memCfg.getPageSize(),
+            new GridInClosure3X<FullPageId, ByteBuffer, Integer>() {
+                @Override public void applyx(
+                    FullPageId fullId,
+                    ByteBuffer pageBuf,
+                    Integer tag
+                ) throws IgniteCheckedException {
+                    storeMgr.write(fullId.cacheId(), fullId.pageId(), pageBuf, tag);
+
+                    snapshotMgr.flushDirtyPageHandler(fullId, pageBuf, tag);
+                }
+            },
+            new GridInClosure3X<Long, FullPageId, PageMemoryEx>() {
+                @Override public void applyx(
+                    Long page,
+                    FullPageId fullId,
+                    PageMemoryEx pageMem
+                ) throws IgniteCheckedException {
+                    snapshotMgr.onChangeTrackerPage(page, fullId, pageMem);
+                }
+            },
+            this,
+            memMetrics
+        );
+
+        memMetrics.pageMemory(pageMem);
+
+        return pageMem;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void checkPolicyEvictionProperties(MemoryPolicyConfiguration plcCfg, MemoryConfiguration dbCfg)
+        throws IgniteCheckedException {
+        if (plcCfg.getPageEvictionMode() != DataPageEvictionMode.DISABLED)
+            throw new IgniteCheckedException("Page eviction is not compatible with persistence: " + plcCfg.getName());
+    }
+
+    /**
+     * @param cancel Cancel flag.
+     */
+    @SuppressWarnings("unused")
+    private void shutdownCheckpointer(boolean cancel) {
+        Checkpointer cp = checkpointer;
+
+        if (cp != null) {
+            if (cancel)
+                cp.shutdownNow();
+            else
+                cp.cancel();
+
+            try {
+                U.join(cp);
+
+                checkpointer = null;
+            }
+            catch (IgniteInterruptedCheckedException ignore) {
+                U.warn(log, "Was interrupted while waiting for checkpointer shutdown, " +
+                    "will not wait for checkpoint to finish.");
+
+                cp.shutdownNow();
+
+                while (true) {
+                    try {
+                        U.join(cp);
+
+                        checkpointer = null;
+
+                        cp.scheduledCp.cpFinishFut.onDone(
+                            new NodeStoppingException("Checkpointer is stopped during node stop."));
+
+                        break;
+                    }
+                    catch (IgniteInterruptedCheckedException ignored) {
+                        //Ignore
+                    }
+                }
+
+                Thread.currentThread().interrupt();
+            }
+        }
+
+        if (asyncRunner != null) {
+            asyncRunner.shutdownNow();
+
+            try {
+                asyncRunner.awaitTermination(2, TimeUnit.MINUTES);
+            }
+            catch (InterruptedException ignore) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void beforeExchange(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
+        DiscoveryEvent discoEvt = fut.discoveryEvent();
+
+        boolean joinEvt = discoEvt.type() == EventType.EVT_NODE_JOINED;
+
+        boolean locNode = discoEvt.eventNode().isLocal();
+
+        boolean isSrvNode = !cctx.kernalContext().clientNode();
+
+        boolean clusterStatusActive = cctx.kernalContext().state().active();
+
+        boolean clusterInTransitionStateToActive = fut.newClusterState() == ClusterState.ACTIVE;
+
+        // Before local node join event.
+        if (clusterInTransitionStateToActive ||
+            (joinEvt && locNode && isSrvNode && clusterStatusActive))
+            restoreState();
+
+        if (cctx.kernalContext().query().moduleEnabled()) {
+            for (GridCacheContext cacheCtx : (Collection<GridCacheContext>)cctx.cacheContexts()) {
+                if (cacheCtx.startTopologyVersion().equals(fut.topologyVersion()) &&
+                    !cctx.pageStore().hasIndexStore(cacheCtx.groupId()) && cacheCtx.affinityNode()) {
+                    final int cacheId = cacheCtx.cacheId();
+
+                    final IgniteInternalFuture<?> rebuildFut = cctx.kernalContext().query()
+                        .rebuildIndexesFromHash(Collections.singletonList(cacheCtx.cacheId()));
+
+                    idxRebuildFuts.put(cacheId, rebuildFut);
+
+                    rebuildFut.listen(new CI1<IgniteInternalFuture>() {
+                        @Override public void apply(IgniteInternalFuture igniteInternalFut) {
+                            idxRebuildFuts.remove(cacheId, rebuildFut);
+                        }
+                    });
+                }
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public IgniteInternalFuture indexRebuildFuture(int cacheId) {
+        return idxRebuildFuts.get(cacheId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean persistenceEnabled() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onCacheGroupsStopped(
+        Collection<IgniteBiTuple<CacheGroupContext, Boolean>> stoppedGrps) {
+        try {
+            waitForCheckpoint("caches stop");
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to wait for checkpoint finish during cache stop.", e);
+        }
+
+        Map<PageMemoryEx, Collection<Integer>> destroyed = new HashMap<>();
+
+        for (IgniteBiTuple<CacheGroupContext, Boolean> tup : stoppedGrps) {
+            PageMemoryEx pageMem = (PageMemoryEx)tup.get1().memoryPolicy().pageMemory();
+
+            Collection<Integer> grpIds = destroyed.get(pageMem);
+
+            if (grpIds == null) {
+                grpIds = new HashSet<>();
+
+                destroyed.put(pageMem, grpIds);
+            }
+
+            grpIds.add(tup.get1().groupId());
+
+            pageMem.onCacheGroupDestroyed(tup.get1().groupId());
+        }
+
+        Collection<IgniteInternalFuture<Void>> clearFuts = new ArrayList<>(destroyed.size());
+
+        for (Map.Entry<PageMemoryEx, Collection<Integer>> entry : destroyed.entrySet()) {
+            final Collection<Integer> grpIds = entry.getValue();
+
+            clearFuts.add(entry.getKey().clearAsync(new P3<Integer, Long, Integer>() {
+                @Override public boolean apply(Integer grpId, Long pageId, Integer tag) {
+                    return grpIds.contains(grpId);
+                }
+            }, false));
+        }
+        for (IgniteInternalFuture<Void> clearFut : clearFuts) {
+            try {
+                clearFut.get();
+            }
+            catch (IgniteCheckedException e) {
+                log.error("Failed to clear page memory", e);
+            }
+        }
+
+        if (cctx.pageStore() != null) {
+            for (IgniteBiTuple<CacheGroupContext, Boolean> tup : stoppedGrps) {
+                CacheGroupContext grp = tup.get1();
+
+                if (grp.affinityNode()) {
+                    try {
+                        cctx.pageStore().shutdownForCacheGroup(grp, tup.get2());
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Failed to gracefully clean page store resources for destroyed cache " +
+                            "[cache=" + grp.cacheOrGroupName() + "]", e);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Gets the checkpoint read lock. While this lock is held, checkpoint thread will not acquiSnapshotWorkerre memory state.
+     */
+    @SuppressWarnings("LockAcquiredButNotSafelyReleased")
+    @Override public void checkpointReadLock() {
+        if (checkpointLock.writeLock().isHeldByCurrentThread())
+            return;
+
+        for (; ; ) {
+            checkpointLock.readLock().lock();
+
+            if (stopping) {
+                checkpointLock.readLock().unlock();
+
+                throw new RuntimeException("Failed to perform cache update: node is stopping.");
+            }
+
+            if (safeToUpdatePageMemories() || checkpointLock.getReadHoldCount() > 1)
+                break;
+            else {
+                checkpointLock.readLock().unlock();
+
+                try {
+                    checkpointer.wakeupForCheckpoint(0, "too many dirty pages").cpBeginFut.get();
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException("Failed to wait for checkpoint begin.", e);
+                }
+            }
+        }
+
+        if (ASSERTION_ENABLED)
+            CHECKPOINT_LOCK_HOLD_COUNT.set(CHECKPOINT_LOCK_HOLD_COUNT.get() + 1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean checkpointLockIsHeldByThread() {
+        return !ASSERTION_ENABLED ||
+            checkpointLock.isWriteLockedByCurrentThread() ||
+            CHECKPOINT_LOCK_HOLD_COUNT.get() > 0;
+    }
+
+    /**
+     * @return {@code true} if all PageMemory instances are safe to update.
+     */
+    private boolean safeToUpdatePageMemories() {
+        Collection<MemoryPolicy> memPlcs = context().database().memoryPolicies();
+
+        if (memPlcs == null)
+            return true;
+
+        for (MemoryPolicy memPlc : memPlcs) {
+            PageMemoryEx pageMemEx = (PageMemoryEx) memPlc.pageMemory();
+
+            if (!pageMemEx.safeToUpdate())
+                return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * Releases the checkpoint read lock.
+     */
+    @Override public void checkpointReadUnlock() {
+        if (checkpointLock.writeLock().isHeldByCurrentThread())
+            return;
+
+        checkpointLock.readLock().unlock();
+
+        if (checkpointer != null) {
+            Collection<MemoryPolicy> memPlcs = context().database().memoryPolicies();
+
+            if (memPlcs != null) {
+                for (MemoryPolicy memPlc : memPlcs) {
+                    PageMemoryEx mem = (PageMemoryEx)memPlc.pageMemory();
+
+                    if (mem != null && !mem.safeToUpdate()) {
+                        checkpointer.wakeupForCheckpoint(0, "too many dirty pages");
+
+                        break;
+                    }
+                }
+            }
+        }
+
+        if (ASSERTION_ENABLED)
+            CHECKPOINT_LOCK_HOLD_COUNT.set(CHECKPOINT_LOCK_HOLD_COUNT.get() - 1);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed to restore database status from WAL.
+     */
+    private void restoreState() throws IgniteCheckedException {
+        try {
+            CheckpointStatus status = readCheckpointStatus();
+
+            checkpointReadLock();
+
+            try {
+                applyLastUpdates(status);
+            }
+            finally {
+                checkpointReadUnlock();
+            }
+
+            snapshotMgr.restoreState();
+
+            checkpointer = new Checkpointer(cctx.igniteInstanceName(), "db-checkpoint-thread", log);
+
+            new IgniteThread(cctx.igniteInstanceName(), "db-checkpoint-thread", checkpointer).start();
+        }
+        catch (StorageException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized Map<Integer, Map<Integer, Long>> reserveHistoryForExchange() {
+        assert reservedForExchange == null : reservedForExchange;
+
+        reservedForExchange = new HashMap<>();
+
+        for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+            if (grp.isLocal())
+                continue;
+
+            for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions()) {
+                if (part.state() != GridDhtPartitionState.OWNING || part.dataStore().fullSize() <= walRebalanceThreshold)
+                    continue;
+
+                CheckpointEntry cpEntry = searchCheckpointEntry(grp.groupId(), part.id(), null);
+
+                try {
+                    if (cpEntry != null && cctx.wal().reserve(cpEntry.cpMark)) {
+                        Map<Integer, T2<Long, WALPointer>> cacheMap = reservedForExchange.get(grp.groupId());
+
+                        if (cacheMap == null) {
+                            cacheMap = new HashMap<>();
+
+                            reservedForExchange.put(grp.groupId(), cacheMap);
+                        }
+
+                        cacheMap.put(part.id(), new T2<>(cpEntry.partitionCounter(grp.groupId(), part.id()), cpEntry.cpMark));
+                    }
+                }
+                catch (IgniteCheckedException ex) {
+                    U.error(log, "Error while trying to reserve history", ex);
+                }
+            }
+        }
+
+        Map<Integer, Map<Integer, Long>> resMap = new HashMap<>();
+
+        for (Map.Entry<Integer, Map<Integer, T2<Long, WALPointer>>> e : reservedForExchange.entrySet()) {
+            Map<Integer, Long> cacheMap = new HashMap<>();
+
+            for (Map.Entry<Integer, T2<Long, WALPointer>> e0 : e.getValue().entrySet())
+                cacheMap.put(e0.getKey(), e0.getValue().get1());
+
+            resMap.put(e.getKey(), cacheMap);
+        }
+
+        return resMap;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void releaseHistoryForExchange() {
+        if (reservedForExchange == null)
+            return;
+
+        for (Map.Entry<Integer, Map<Integer, T2<Long, WALPointer>>> e : reservedForExchange.entrySet()) {
+            for (Map.Entry<Integer, T2<Long, WALPointer>> e0 : e.getValue().entrySet()) {
+                try {
+                    cctx.wal().release(e0.getValue().get2());
+                }
+                catch (IgniteCheckedException ex) {
+                    U.error(log, "Could not release history lock", ex);
+                }
+            }
+        }
+
+        reservedForExchange = null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean reserveHistoryForPreloading(int grpId, int partId, long cntr) {
+        CheckpointEntry cpEntry = searchCheckpointEntry(grpId, partId, cntr);
+
+        if (cpEntry == null)
+            return false;
+
+        WALPointer ptr = cpEntry.cpMark;
+
+        if (ptr == null)
+            return false;
+
+        boolean reserved;
+
+        try {
+            reserved = cctx.wal().reserve(ptr);
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Error while trying to reserve history", e);
+
+            reserved = false;
+        }
+
+        if (reserved)
+            reservedForPreloading.put(new T2<>(grpId, partId), new T2<>(cntr, ptr));
+
+        return reserved;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void releaseHistoryForPreloading() {
+        for (Map.Entry<T2<Integer, Integer>, T2<Long, WALPointer>> e : reservedForPreloading.entrySet()) {
+            try {
+                cctx.wal().release(e.getValue().get2());
+            }
+            catch (IgniteCheckedException ex) {
+                U.error(log, "Could not release WAL reservation", ex);
+
+                throw new IgniteException(ex);
+            }
+        }
+
+        reservedForPreloading.clear();
+    }
+
+    /**
+     * For debugging only. TODO: remove.
+     *
+     */
+    public Map<T2<Integer, Integer>, T2<Long, WALPointer>> reservedForPreloading() {
+        return reservedForPreloading;
+    }
+
+    /**
+     *
+     */
+    @Nullable @Override public IgniteInternalFuture wakeupForCheckpoint(String reason) {
+        Checkpointer cp = checkpointer;
+
+        if (cp != null)
+            return cp.wakeupForCheckpoint(0, reason).cpBeginFut;
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void waitForCheckpoint(String reason) throws IgniteCheckedException {
+        Checkpointer cp = checkpointer;
+
+        if (cp == null)
+            return;
+
+        CheckpointProgressSnapshot progSnapshot = cp.wakeupForCheckpoint(0, reason);
+
+        IgniteInternalFuture fut1 = progSnapshot.cpFinishFut;
+
+        fut1.get();
+
+        if (!progSnapshot.started)
+            return;
+
+        IgniteInternalFuture fut2 = cp.wakeupForCheckpoint(0, reason).cpFinishFut;
+
+        assert fut1 != fut2;
+
+        fut2.get();
+    }
+
+    /**
+     * Tries to search for a WAL pointer for the given partition counter start.
+     *
+     * @param grpId Cache group ID.
+     * @param part Partition ID.
+     * @param partCntrSince Partition counter or {@code null} to search for minimal counter.
+     * @return Checkpoint entry or {@code null} if failed to search.
+     */
+    @Nullable public WALPointer searchPartitionCounter(int grpId, int part, @Nullable Long partCntrSince) {
+        CheckpointEntry entry = searchCheckpointEntry(grpId, part, partCntrSince);
+
+        if (entry == null)
+            return null;
+
+        return entry.cpMark;
+    }
+
+    /**
+     * Tries to search for a WAL pointer for the given partition counter start.
+     *
+     * @param grpId Cache group ID.
+     * @param part Partition ID.
+     * @param partCntrSince Partition counter or {@code null} to search for minimal counter.
+     * @return Checkpoint entry or {@code null} if failed to search.
+     */
+    @Nullable private CheckpointEntry searchCheckpointEntry(int grpId, int part, @Nullable Long partCntrSince) {
+        boolean hasGap = false;
+        CheckpointEntry first = null;
+
+        for (Long cpTs : checkpointHist.checkpoints()) {
+            try {
+                CheckpointEntry entry = checkpointHist.entry(cpTs);
+
+                Long foundCntr = entry.partitionCounter(grpId, part);
+
+                if (foundCntr != null) {
+                    if (partCntrSince == null) {
+                        if (hasGap) {
+                            first = entry;
+
+                            hasGap = false;
+                        }
+
+                        if (first == null)
+                            first = entry;
+                    }
+                    else if (foundCntr <= partCntrSince) {
+                        first = entry;
+
+                        hasGap = false;
+                    }
+                    else
+                        return hasGap ? null : first;
+                }
+                else
+                    hasGap = true;
+            }
+            catch (IgniteCheckedException ignore) {
+                // Treat exception the same way as a gap.
+                hasGap = true;
+            }
+        }
+
+        return hasGap ? null : first;
+    }
+
+    /**
+     * @return Checkpoint history. For tests only.
+     */
+    public CheckpointHistory checkpointHistory() {
+        return checkpointHist;
+    }
+
+    /**
+     * @return Checkpoint directory.
+     */
+    public File checkpointDirectory() {
+        return cpDir;
+    }
+
+    /**
+     * @param lsnr Listener.
+     */
+    public void addCheckpointListener(DbCheckpointListener lsnr) {
+        lsnrs.add(lsnr);
+    }
+
+    /**
+     * @param lsnr Listener.
+     */
+    public void removeCheckpointListener(DbCheckpointListener lsnr) {
+        lsnrs.remove(lsnr);
+    }
+
+    /**
+     * @return Read checkpoint status.
+     * @throws IgniteCheckedException If failed to read checkpoint status page.
+     */
+    @SuppressWarnings("TooBroadScope")
+    private CheckpointStatus readCheckpointStatus() throws IgniteCheckedException {
+        long lastStartTs = 0;
+        long lastEndTs = 0;
+
+        UUID startId = CheckpointStatus.NULL_UUID;
+        UUID endId = CheckpointStatus.NULL_UUID;
+
+        File startFile = null;
+        File endFile = null;
+
+        WALPointer startPtr = CheckpointStatus.NULL_PTR;
+        WALPointer endPtr = CheckpointStatus.NULL_PTR;
+
+        File dir = cpDir;
+
+        if (!dir.exists()) {
+            // TODO: remove excessive logging after GG-12116 fix.
+            File[] files = dir.listFiles();
+
+            if (files != null && files.length > 0) {
+                log.warning("Read checkpoint status: cpDir.exists() is false, cpDir.listFiles() is: " +
+                    Arrays.toString(files));
+            }
+
+            if (Files.exists(dir.toPath()))
+                log.warning("Read checkpoint status: cpDir.exists() is false, Files.exists(cpDir) is true.");
+
+            log.info("Read checkpoint status: checkpoint directory is not found.");
+
+            return new CheckpointStatus(0, startId, startPtr, endId, endPtr);
+        }
+
+        File[] files = dir.listFiles();
+
+        for (File file : files) {
+            Matcher matcher = CP_FILE_NAME_PATTERN.matcher(file.getName());
+
+            if (matcher.matches()) {
+                long ts = Long.parseLong(matcher.group(1));
+                UUID id = UUID.fromString(matcher.group(2));
+                CheckpointEntryType type = CheckpointEntryType.valueOf(matcher.group(3));
+
+                if (type == CheckpointEntryType.START && ts > lastStartTs) {
+                    lastStartTs = ts;
+                    startId = id;
+                    startFile = file;
+                }
+                else if (type == CheckpointEntryType.END && ts > lastEndTs) {
+                    lastEndTs = ts;
+                    endId = id;
+                    endFile = file;
+                }
+            }
+        }
+
+        ByteBuffer buf = ByteBuffer.allocate(20);
+        buf.order(ByteOrder.nativeOrder());
+
+        if (startFile != null)
+            startPtr = readPointer(startFile, buf);
+
+        if (endFile != null)
+            endPtr = readPointer(endFile, buf);
+
+        // TODO: remove excessive logging after GG-12116 fix.
+        log.info("Read checkpoint status: start marker = " + startFile + ", end marker = " + endFile);
+
+        return new CheckpointStatus(lastStartTs, startId, startPtr, endId, endPtr);
+    }
+
+    /**
+     * @param cpMarkerFile Checkpoint mark file.
+     * @return WAL pointer.
+     * @throws IgniteCheckedException If failed to read mark file.
+     */
+    private WALPointer readPointer(File cpMarkerFile, ByteBuffer buf) throws IgniteCheckedException {
+        buf.position(0);
+
+        try (FileChannel ch = FileChannel.open(cpMarkerFile.toPath(), StandardOpenOption.READ)) {
+            ch.read(buf);
+
+            buf.flip();
+
+            return new FileWALPointer(buf.getLong(), buf.getInt(), buf.getInt());
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException("Failed to read checkpoint pointer from marker file: " +
+                cpMarkerFile.getAbsolutePath(), e);
+        }
+    }
+
+    /**
+     * @param status Checkpoint status.
+     */
+    private WALPointer restoreMemory(CheckpointStatus status) throws IgniteCheckedException {
+        if (log.isInfoEnabled())
+            log.info("Checking memory state [lastValidPos=" + status.endPtr + ", lastMarked="
+                + status.startPtr + ", lastCheckpointId=" + status.cpStartId + ']');
+
+        boolean apply = status.needRestoreMemory();
+
+        if (apply) {
+            U.quietAndWarn(log, "Ignite node crashed in the middle of checkpoint. Will restore memory state and " +
+                "enforce checkpoint on node start.");
+
+            cctx.pageStore().beginRecover();
+        }
+
+        long start = U.currentTimeMillis();
+        int applied = 0;
+        WALPointer lastRead = null;
+
+        try (WALIterator it = cctx.wal().replay(status.endPtr)) {
+            while (it.hasNextX()) {
+                IgniteBiTuple<WALPointer, WALRecord> tup = it.nextX();
+
+                WALRecord rec = tup.get2();
+
+                lastRead = tup.get1();
+
+                switch (rec.type()) {
+                    case CHECKPOINT_RECORD:
+                        CheckpointRecord cpRec = (CheckpointRecord)rec;
+
+                        // We roll memory up until we find a checkpoint start record registered in the status.
+                        if (F.eq(cpRec.checkpointId(), status.cpStartId)) {
+                            log.info("Found last checkpoint marker [cpId=" + cpRec.checkpointId() +
+                                ", pos=" + tup.get1() + ']');
+
+                            apply = false;
+                        }
+                        else if (!F.eq(cpRec.checkpointId(), status.cpEndId))
+                            U.warn(log, "Found unexpected checkpoint marker, skipping [cpId=" + cpRec.checkpointId() +
+                                ", expCpId=" + status.cpStartId + ", pos=" + tup.get1() + ']');
+
+                        break;
+
+                    case PAGE_RECORD:
+                        if (apply) {
+                            PageSnapshot pageRec = (PageSnapshot)rec;
+
+                            // Here we do not require tag check because we may be applying memory changes after
+                            // several repetitive restarts and the same pages may have changed several times.
+                            int cacheId = pageRec.fullPageId().cacheId();
+                            long pageId = pageRec.fullPageId().pageId();
+
+                            PageMemoryEx pageMem = getPageMemoryForCacheGroup(cacheId);
+
+                            long page = pageMem.acquirePage(cacheId, pageId, true);
+
+                            try {
+                                long pageAddr = pageMem.writeLock(cacheId, pageId, page);
+
+                                try {
+                                    PageUtils.putBytes(pageAddr, 0, pageRec.pageData());
+                                }
+                                finally {
+                                    pageMem.writeUnlock(cacheId, pageId, page, null, true, true);
+                                }
+                            }
+                            finally {
+                                pageMem.releasePage(cacheId, pageId, page);
+                            }
+
+                            applied++;
+                        }
+
+                        break;
+
+                    case PARTITION_DESTROY:
+                        if (apply) {
+                            PartitionDestroyRecord destroyRec = (PartitionDestroyRecord)rec;
+
+                            final int cId = destroyRec.cacheId();
+                            final int pId = destroyRec.partitionId();
+
+                            PageMemoryEx pageMem = getPageMemoryForCacheGroup(cId);
+
+                            pageMem.clearAsync(new P3<Integer, Long, Integer>() {
+                                @Override public boolean apply(Integer cacheId, Long pageId, Integer tag) {
+                                    return cacheId == cId && PageIdUtils.partId(pageId) == pId;
+                                }
+                            }, true).get();
+                        }
+
+                        break;
+
+                    default:
+                        if (apply && rec instanceof PageDeltaRecord) {
+                            PageDeltaRecord r = (PageDeltaRecord)rec;
+
+                            int cacheId = r.cacheId();
+                            long pageId = r.pageId();
+
+                            PageMemoryEx pageMem = getPageMemoryForCacheGroup(cacheId);
+
+                            // Here we do not require tag check because we may be applying memory changes after
+                            // several repetitive restarts and the same pages may have changed several times.
+                            long page = pageMem.acquirePage(cacheId, pageId, true);
+
+                            try {
+                                long pageAddr = pageMem.writeLock(cacheId, pageId, page);
+
+                                try {
+                                    r.applyDelta(pageMem, pageAddr);
+                                }
+                                finally {
+                                    pageMem.writeUnlock(cacheId, pageId, page, null, true, true);
+                                }
+                            }
+                            finally {
+                                pageMem.releasePage(cacheId, pageId, page);
+                            }
+
+                            applied++;
+                        }
+                }
+            }
+        }
+
+        if (status.needRestoreMemory()) {
+            if (apply)
+                throw new IgniteCheckedException("Failed to restore memory state (checkpoint marker is present " +
+                    "on disk, but checkpoint record is missed in WAL) " +
+                    "[cpStatus=" + status + ", lastRead=" + lastRead + "]");
+
+            log.info("Finished applying memory changes [changesApplied=" + applied +
+                ", time=" + (U.currentTimeMillis() - start) + "ms]");
+
+            if (applied > 0)
+                finalizeCheckpointOnRecovery(status.cpStartTs, status.cpStartId, status.startPtr);
+        }
+
+        checkpointHist.loadHistory(cpDir);
+
+        return lastRead == null ? null : lastRead.next();
+    }
+
+    /**
+     * Obtains PageMemory reference from cache descriptor instead of cache context.
+     *
+     * @param grpId Cache group id.
+     * @return PageMemoryEx instance.
+     * @throws IgniteCheckedException if no MemoryPolicy is configured for a name obtained from cache descriptor.
+     */
+    private PageMemoryEx getPageMemoryForCacheGroup(int grpId) throws IgniteCheckedException {
+        // TODO IGNITE-5075: cache descriptor can be removed.
+        GridCacheSharedContext sharedCtx = context();
+
+        String memPlcName = sharedCtx
+            .cache()
+            .cacheGroupDescriptors().get(grpId)
+            .config()
+            .getMemoryPolicyName();
+
+        return (PageMemoryEx)sharedCtx.database().memoryPolicy(memPlcName).pageMemory();
+    }
+
+    /**
+     * @param status Last registered checkpoint status.
+     * @throws IgniteCheckedException If failed to apply updates.
+     * @throws StorageException If IO exception occurred while reading write-ahead log.
+     */
+    private void applyLastUpdates(CheckpointStatus status) throws IgniteCheckedException {
+        if (log.isInfoEnabled())
+            log.info("Applying lost cache updates since last checkpoint record [lastMarked="
+                + status.startPtr + ", lastCheckpointId=" + status.cpStartId + ']');
+
+        cctx.kernalContext().query().skipFieldLookup(true);
+
+        long start = U.currentTimeMillis();
+        int applied = 0;
+
+        try (WALIterator it = cctx.wal().replay(status.startPtr)) {
+            Map<T2<Integer, Integer>, T2<Integer, Long>> partStates = new HashMap<>();
+
+            while (it.hasNextX()) {
+                IgniteBiTuple<WALPointer, WALRecord> next = it.nextX();
+
+                WALRecord rec = next.get2();
+
+                switch (rec.type()) {
+                    case DATA_RECORD:
+                        DataRecord dataRec = (DataRecord)rec;
+
+                        for (DataEntry dataEntry : dataRec.writeEntries()) {
+                            int cacheId = dataEntry.cacheId();
+
+                            GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+
+                            applyUpdate(cacheCtx, dataEntry);
+
+                            applied++;
+                        }
+
+                        break;
+
+                    case PART_META_UPDATE_STATE:
+                        PartitionMetaStateRecord metaStateRecord = (PartitionMetaStateRecord)rec;
+
+                        partStates.put(new T2<>(metaStateRecord.cacheId(), metaStateRecord.partitionId()),
+                            new T2<>((int)metaStateRecord.state(), metaStateRecord.updateCounter()));
+
+                        break;
+
+                    default:
+                        // Skip other records.
+                }
+            }
+
+            restorePartitionState(partStates);
+        }
+        finally {
+            cctx.kernalContext().query().skipFieldLookup(false);
+        }
+
+        if (log.isInfoEnabled())
+            log.info("Finished applying WAL changes [updatesApplied=" + applied +
+                ", time=" + (U.currentTimeMillis() - start) + "ms]");
+    }
+
+    /**
+     * @param partStates Partition states.
+     * @throws IgniteCheckedException If failed to restore.
+     */
+    private void restorePartitionState(
+        Map<T2<Integer, Integer>, T2<Integer, Long>> partStates
+    ) throws IgniteCheckedException {
+        for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+            int grpId = grp.groupId();
+
+            PageMemoryEx pageMem = (PageMemoryEx)grp.memoryPolicy().pageMemory();
+
+            for (int i = 0; i < grp.affinity().partitions(); i++) {
+                if (storeMgr.exists(grpId, i)) {
+                    storeMgr.ensure(grpId, i);
+
+                    if (storeMgr.pages(grpId, i) <= 1)
+                        continue;
+
+                    long partMetaId = pageMem.partitionMetaPageId(grpId, i);
+                    long partMetaPage = pageMem.acquirePage(grpId, partMetaId);
+                    try {
+                        long pageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage);
+
+                        boolean changed = false;
+
+                        try {
+                            PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.forPage(pageAddr);
+
+                            T2<Integer, Long> fromWal = partStates.get(new T2<>(grpId, i));
+
+                            GridDhtLocalPartition part = grp.topology()
+                                .localPartition(i, AffinityTopologyVersion.NONE, true);
+
+                            assert part != null;
+
+                            if (fromWal != null) {
+                                int stateId = fromWal.get1();
+
+                                io.setPartitionState(pageAddr, (byte)stateId);
+
+                                changed = updateState(part, stateId);
+
+                                if (stateId == GridDhtPartitionState.OWNING.ordinal()) {
+                                    grp.offheap().onPartitionInitialCounterUpdated(i, fromWal.get2());
+
+                                    if (part.initialUpdateCounter() < fromWal.get2()) {
+                                        part.initialUpdateCounter(fromWal.get2());
+
+                                        changed = true;
+                                    }
+                                }
+                            }
+                            else
+                                changed = updateState(part, (int)io.getPartitionState(pageAddr));
+                        }
+                        finally {
+                            pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, changed);
+                        }
+                    }
+                    finally {
+                        pageMem.releasePage(grpId, partMetaId, partMetaPage);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * @param part Partition to restore state for.
+     * @param stateId State enum ordinal.
+     * @return Updated flag.
+     */
+    private boolean updateState(GridDhtLocalPartition part, int stateId) {
+        if (stateId != -1) {
+            GridDhtPartitionState state = GridDhtPartitionState.fromOrdinal(stateId);
+
+            assert state != null;
+
+            part.restoreState(state == GridDhtPartitionState.EVICTED ? GridDhtPartitionState.RENTING : state);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * @param cacheCtx Cache context to apply an update.
+     * @param dataEntry Data entry to apply.
+     */
+    private void applyUpdate(GridCacheContext cacheCtx, DataEntry dataEntry) throws IgniteCheckedException {
+        GridDhtLocalPartition locPart = cacheCtx.topology()
+            .localPartition(dataEntry.partitionId(), AffinityTopologyVersion.NONE, true);
+
+        switch (dataEntry.op()) {
+            case CREATE:
+            case UPDATE:
+                cacheCtx.offheap().update(
+                    cacheCtx,
+                    dataEntry.key(),
+                    dataEntry.value(),
+                    dataEntry.writeVersion(),
+                    0L,
+                    locPart,
+                    null);
+
+                if (dataEntry.partitionCounter() != 0)
+                    cacheCtx.offheap().onPartitionInitialCounterUpdated(dataEntry.partitionId(), dataEntry.partitionCounter());
+
+                break;
+
+            case DELETE:
+                cacheCtx.offheap().remove(cacheCtx, dataEntry.key(), dataEntry.partitionId(), locPart);
+
+                if (dataEntry.partitionCounter() != 0)
+                    cacheCtx.offheap().onPartitionInitialCounterUpdated(dataEntry.partitionId(), dataEntry.partitionCounter());
+
+                break;
+
+            default:
+                throw new IgniteCheckedException("Invalid operation for WAL entry update: " + dataEntry.op());
+        }
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    private void finalizeCheckpointOnRecovery(long cpTs, UUID cpId, WALPointer walPtr) throws IgniteCheckedException {
+        assert cpTs != 0;
+
+        ByteBuffer tmpWriteBuf = ByteBuffer.allocateDirect(pageSize());
+
+        long start = System.currentTimeMillis();
+
+        Collection<MemoryPolicy> memPolicies = context().database().memoryPolicies();
+
+        List<IgniteBiTuple<PageMemory, Collection<FullPageId>>> cpEntities = new ArrayList<>(memPolicies.size());
+
+        for (MemoryPolicy memPlc : memPolicies) {
+            PageMemoryEx pageMem = (PageMemoryEx) memPlc.pageMemory();
+            cpEntities.add(new IgniteBiTuple<PageMemory, Collection<FullPageId>>(pageMem,
+                (pageMem).beginCheckpoint()));
+        }
+
+        tmpWriteBuf.order(ByteOrder.nativeOrder());
+
+        // Identity stores set.
+        Collection<PageStore> updStores = new HashSet<>();
+
+        int cpPagesCnt = 0;
+
+        for (IgniteBiTuple<PageMemory, Collection<FullPageId>> e : cpEntities) {
+            PageMemoryEx pageMem = (PageMemoryEx) e.get1();
+
+            Collection<FullPageId> cpPages = e.get2();
+
+            cpPagesCnt += cpPages.size();
+
+            for (FullPageId fullId : cpPages) {
+                tmpWriteBuf.rewind();
+
+                Integer tag = pageMem.getForCheckpoint(fullId, tmpWriteBuf, null);
+
+                if (tag != null) {
+                    tmpWriteBuf.rewind();
+
+                    PageStore store = storeMgr.writeInternal(fullId.cacheId(), fullId.pageId(), tmpWriteBuf, tag);
+
+                    tmpWriteBuf.rewind();
+
+                    updStores.add(store);
+                }
+            }
+        }
+
+        long written = U.currentTimeMillis();
+
+        for (PageStore updStore : updStores)
+            updStore.sync();
+
+        long fsync = U.currentTimeMillis();
+
+        for (IgniteBiTuple<PageMemory, Collection<FullPageId>> e : cpEntities)
+            ((PageMemoryEx)e.get1()).finishCheckpoint();
+
+        writeCheckpointEntry(
+            tmpWriteBuf,
+            cpTs,
+            cpId,
+            walPtr,
+            null,
+            CheckpointEntryType.END);
+
+        cctx.pageStore().finishRecover();
+
+        if (log.isInfoEnabled())
+            log.info(String.format("Checkpoint finished [cpId=%s, pages=%d, markPos=%s, " +
+                    "pagesWrite=%dms, fsync=%dms, total=%dms]",
+                cpId,
+                cpPagesCnt,
+                walPtr,
+                written - start,
+                fsync - written,
+                fsync - start));
+    }
+
+    /**
+     * @param cpId Checkpoint ID.
+     * @param ptr Wal pointer of current checkpoint.
+     */
+    private CheckpointEntry writeCheckpointEntry(
+        ByteBuffer tmpWriteBuf,
+        long cpTs,
+        UUID cpId,
+        WALPointer ptr,
+        CheckpointRecord rec,
+        CheckpointEntryType type
+    ) throws IgniteCheckedException {
+        assert ptr instanceof FileWALPointer;
+
+        FileWALPointer filePtr = (FileWALPointer)ptr;
+
+        String fileName = checkpointFileName(cpTs, cpId, type);
+
+        try (FileChannel ch = FileChannel.open(Paths.get(cpDir.getAbsolutePath(), fileName),
+            StandardOpenOption.CREATE_NEW, StandardOpenOption.APPEND)) {
+
+            tmpWriteBuf.rewind();
+
+            tmpWriteBuf.putLong(filePtr.index());
+
+            tmpWriteBuf.putInt(filePtr.fileOffset());
+
+            tmpWriteBuf.putInt(filePtr.length());
+
+            tmpWriteBuf.flip();
+
+            ch.write(tmpWriteBuf);
+
+            tmpWriteBuf.clear();
+
+            if (!skipSync)
+                ch.force(true);
+
+            return type == CheckpointEntryType.START ?
+                new CheckpointEntry(cpTs, ptr, cpId, rec.cacheGroupStates()) : null;
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /**
+     * @param cpTs Checkpoint timestamp.
+     * @param cpId Checkpoint ID.
+     * @param type Checkpoint type.
+     * @return Checkpoint file name.
+     */
+    private static String checkpointFileName(long cpTs, UUID cpId, CheckpointEntryType type) {
+        return cpTs + "-" + cpId + "-" + type + ".bin";
+    }
+
+    /**
+     *
+     */
+    @SuppressWarnings("NakedNotify")
+    public class Checkpointer extends GridWorker {
+        /** Temporary write buffer. */
+        private final ByteBuffer tmpWriteBuf;
+
+        /** Next scheduled checkpoint progress. */
+        private volatile CheckpointProgress scheduledCp;
+
+        /** Current checkpoint. This field is updated only by checkpoint thread. */
+        private volatile CheckpointProgress curCpProgress;
+
+        /** Shutdown now. */
+        private volatile boolean shutdownNow;
+
+        /**
+         * @param gridName Grid name.
+         * @param name Thread name.
+         * @param log Logger.
+         */
+        protected Checkpointer(@Nullable String gridName, String name, IgniteLogger log) {
+            super(gridName, name, log);
+
+            scheduledCp = new CheckpointProgress(U.currentTimeMillis() + checkpointFreq);
+
+            tmpWriteBuf = ByteBuffer.allocateDirect(pageSize());
+
+            tmpWriteBuf.order(ByteOrder.nativeOrder());
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+            while (!isCancelled()) {
+                waitCheckpointEvent();
+
+                GridFutureAdapter<Void> enableChangeApplied = GridCacheDatabaseSharedManager.this.enableChangeApplied;
+
+                if (enableChangeApplied != null) {
+                    enableChangeApplied.onDone();
+
+                    GridCacheDatabaseSharedManager.this.enableChangeApplied = null;
+                }
+
+                if (checkpointsEnabled)
+                    doCheckpoint();
+                else {
+                    synchronized (this) {
+                        scheduledCp.nextCpTs = U.currentTimeMillis() + checkpointFreq;
+                    }
+                }
+            }
+
+            // Final run after the cancellation.
+            if (checkpointsEnabled && !shutdownNow)
+                doCheckpoint();
+
+            scheduledCp.cpFinishFut.onDone(new NodeStoppingException("Node is stopping."));
+        }
+
+        /**
+         *
+         */
+        private CheckpointProgressSnapshot wakeupForCheckpoint(long delayFromNow, String reason) {
+            CheckpointProgress sched = scheduledCp;
+
+            long next = U.currentTimeMillis() + delayFromNow;
+
+            if (sched.nextCpTs <= next)
+                return new CheckpointProgressSnapshot(sched);
+
+            CheckpointProgressSnapshot ret;
+
+            synchronized (this) {
+                sched = scheduledCp;
+
+                if (sched.nextCpTs > next) {
+                    sched.reason = reason;
+
+                    sched.nextCpTs = next;
+                }
+
+                ret = new CheckpointProgressSnapshot(sched);
+
+                notifyAll();
+            }
+
+            return ret;
+        }
+
+        /**
+         * @param snapshotOperation Snapshot operation.
+         */
+        public IgniteInternalFuture wakeupForSnapshotCreation(SnapshotOperation snapshotOperation) {
+            GridFutureAdapter<Object> ret;
+
+            synchronized (this) {
+                scheduledCp.nextCpTs = U.currentTimeMillis();
+
+                scheduledCp.reason = "snapshot";
+
+                scheduledCp.nextSnapshot = true;
+
+                scheduledCp.snapshotOperation = snapshotOperation;
+
+                ret = scheduledCp.cpBeginFut;
+
+                notifyAll();
+            }
+
+            return ret;
+        }
+
+        /**
+         *
+         */
+        private void doCheckpoint() {
+            try {
+                CheckpointMetricsTracker tracker = new CheckpointMetricsTracker();
+
+                Checkpoint chp = markCheckpointBegin(tracker);
+
+                if (chp.cpPages == null)
+                    return;
+
+                snapshotMgr.onCheckPointBegin();
+
+                boolean interrupted = true;
+
+                try {
+                    if (chp.hasDelta()) {
+                        // Identity stores set.
+                        GridConcurrentHashSet<PageStore> updStores = new GridConcurrentHashSet<>();
+
+                        CountDownFuture doneWriteFut = new CountDownFuture(
+                            asyncRunner == null ? 1 : chp.cpPages.collectionsSize());
+
+                        tracker.onPagesWriteStart();
+
+                        if (asyncRunner != null) {
+                            for (int i = 0; i < chp.cpPages.collectionsSize(); i++) {
+                                Runnable write = new WriteCheckpointPages(
+                                    tracker,
+                                    chp.cpPages.innerCollection(i),
+                                    updStores,
+                                    doneWriteFut
+                                );
+
+                                try {
+                                    asyncRunner.execute(write);
+                                }
+                                catch (RejectedExecutionException ignore) {
+                                    // Run the task synchronously.
+                                    write.run();
+                                }
+                            }
+                        }
+                        else {
+                            // Single-threaded checkpoint.
+                            Runnable write = new WriteCheckpointPages(tracker, chp.cpPages, updStores, doneWriteFut);
+
+                            write.run();
+                        }
+
+                        // Wait and check for errors.
+                        doneWriteFut.get();
+
+                        // Must re-check shutdown flag here because threads may have skipped some pages.
+                        // If so, we should not put finish checkpoint mark.
+                        if (shutdownNow) {
+                            chp.progress.cpFinishFut.onDone(new NodeStoppingException("Node is stopping."));
+
+                            return;
+                        }
+
+                        snapshotMgr.afterCheckpointPageWritten();
+
+                        tracker.onFsyncStart();
+
+                        if (!skipSync) {
+                            for (PageStore updStore : updStores) {
+                                if (shutdownNow) {
+                                    chp.progress.cpFinishFut.onDone(new NodeStoppingException("Node is stopping."));
+
+                                    return;
+                                }
+
+                                updStore.sync();
+                            }
+                        }
+                    }
+                    else {
+                        tracker.onPagesWriteStart();
+                        tracker.onFsyncStart();
+                    }
+
+                    // Must mark successful checkpoint only if there are no exceptions or interrupts.
+                    interrupted = false;
+                }
+                finally {
+                    if (!interrupted)
+                        markCheckpointEnd(chp);
+                }
+
+                tracker.onEnd();
+
+                if (chp.hasDelta()) {
+                    if (printCheckpointStats) {
+                        if (log.isInfoEnabled())
+                            log.info(String.format("Checkpoint finished [cpId=%s, pages=%d, markPos=%s, " +
+                                    "walSegmentsCleared=%d, markDuration=%dms, pagesWrite=%dms, fsync=%dms, " +
+                                    "total=%dms]",
+                                chp.cpEntry.checkpointId(),
+                                chp.pagesSize,
+                                chp.cpEntry.checkpointMark(),
+                                chp.walFilesDeleted,
+                                tracker.markDuration(),
+                                tracker.pagesWriteDuration(),
+                                tracker.fsyncDuration(),
+                                tracker.totalDuration()));
+                    }
+
+                    persStoreMetrics.onCheckpoint(
+                        tracker.lockWaitDuration(),
+                        tracker.markDuration(),
+                        tracker.pagesWriteDuration(),
+                        tracker.fsyncDuration(),
+                        tracker.totalDuration(),
+                        chp.pagesSize,
+                        tracker.dataPagesWritten(),
+                        tracker.cowPagesWritten());
+                }
+                else {
+                    persStoreMetrics.onCheckpoint(
+                        tracker.lockWaitDuration(),
+                        tracker.markDuration(),
+                        tracker.pagesWriteDuration(),
+                        tracker.fsyncDuration(),
+                        tracker.totalDuration(),
+                        chp.pagesSize,
+                        tracker.dataPagesWritten(),
+                        tracker.cowPagesWritten());
+                }
+            }
+            catch (IgniteCheckedException e) {
+                // TODO-ignite-db how to handle exception?
+                U.error(log, "Failed to create checkpoint.", e);
+            }
+        }
+
+        /**
+         *
+         */
+        @SuppressWarnings("WaitNotInLoop")
+        private void waitCheckpointEvent() {
+            boolean cancel = false;
+
+            try {
+                long now = U.currentTimeMillis();
+
+                synchronized (this) {
+                    long remaining;
+
+                    while ((remaining = scheduledCp.nextCpTs - now) > 0 && !isCancelled()) {
+                        wait(remaining);
+
+                        now = U.currentTimeMillis();
+                    }
+                }
+            }
+            catch (InterruptedException ignored) {
+                Thread.currentThread().interrupt();
+
+                cancel = true;
+            }
+
+            if (cancel)
+                isCancelled = true;
+        }
+
+        /**
+         *
+         */
+        @SuppressWarnings("TooBroadScope")
+        private Checkpoint markCheckpointBegin(CheckpointMetricsTracker tracker) throws IgniteCheckedException {
+            CheckpointRecord cpRec = new CheckpointRecord(null, false);
+
+            WALPointer cpPtr = null;
+
+            GridMultiCollectionWrapper<FullPageId> cpPages;
+
+            final CheckpointProgress curr;
+
+            tracker.onLockWaitStart();
+
+            checkpointLock.writeLock().lock();
+
+            try {
+                tracker.onMarkStart();
+
+                synchronized (this) {
+                    curr = scheduledCp;
+
+                    curr.started = true;
+
+                    if (curr.reason == null)
+                        curr.reason = "timeout";
+
+                    // It is important that we assign a new progress object before checkpoint mark in page memory.
+                    scheduledCp = new CheckpointProgress(U.currentTimeMillis() + checkpointFreq);
+
+                    curCpProgress = curr;
+                }
+
+                final NavigableMap<T2<Integer, Integer>, T2<Integer, Integer>> map =
+                    new TreeMap<>(FullPageIdIterableComparator.INSTANCE);
+
+                DbCheckpointListener.Context ctx0 = new DbCheckpointListener.Context() {
+                    @Override public boolean nextSnapshot() {
+                        return curr.nextSnapshot;
+                    }
+
+                    @Override public Map<T2<Integer, Integer>, T2<Integer, Integer>> partitionStatMap() {
+                        return map;
+                    }
+                };
+
+                // Listeners must be invoked before we write checkpoint record to WAL.
+                for (DbCheckpointListener lsnr : lsnrs)
+                    lsnr.onCheckpointBegin(ctx0);
+
+                for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                    if (grp.isLocal())
+                        continue;
+
+                    List<GridDhtLocalPartition> locParts = new ArrayList<>();
+
+                    for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions())
+                        locParts.add(part);
+
+                    Collections.sort(locParts, ASC_PART_COMPARATOR);
+
+                    CacheState state = new CacheState(locParts.size());
+
+                    for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions())
+                        state.addPartitionState(part.id(), part.dataStore().fullSize(), part.lastAppliedUpdate());
+
+                    cpRec.addCacheGroupState(grp.groupId(), state);
+                }
+
+                if (curr.nextSnapshot)
+                    snapshotMgr.onMarkCheckPointBegin(curr.snapshotOperation, map);
+
+                IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> tup = beginAllCheckpoints();
+
+                // Todo it maybe more optimally
+                Collection<FullPageId> cpPagesList = new ArrayList<>(tup.get2());
+
+                for (GridMultiCollectionWrapper<FullPageId> col : tup.get1()) {
+                    for (int i = 0; i < col.collectionsSize(); i++)
+                        cpPagesList.addAll(col.innerCollection(i));
+                }
+
+                cpPages = new GridMultiCollectionWrapper<>(cpPagesList);
+
+                if (!F.isEmpty(cpPages)) {
+                    // No page updates for this checkpoint are allowed from now on.
+                    cpPtr = cctx.wal().log(cpRec);
+
+                    if (cpPtr == null)
+                        cpPtr = CheckpointStatus.NULL_PTR;
+                }
+            }
+            finally {
+                checkpointLock.writeLock().unlock();
+
+                tracker.onLockRelease();
+            }
+
+            curr.cpBeginFut.onDone();
+
+            if (!F.isEmpty(cpPages)) {
+                assert cpPtr != null;
+
+                // Sync log outside the checkpoint write lock.
+                cctx.wal().fsync(cpPtr);
+
+                long cpTs = System.currentTimeMillis();
+
+                CheckpointEntry cpEntry = writeCheckpointEntry(
+                    tmpWriteBuf,
+                    cpTs,
+                    cpRec.checkpointId(),
+                    cpPtr,
+                    cpRec,
+                    CheckpointEntryType.START);
+
+                checkpointHist.addCheckpointEntry(cpEntry);
+
+                if (printCheckpointStats)
+                    if (log.isInfoEnabled())
+                        log.info(String.format("Checkpoint started [checkpointId=%s, startPtr=%s, checkpointLockWait=%dms, " +
+                                "checkpointLockHoldTime=%dms, pages=%d, reason='%s']",
+                            cpRec.checkpointId(),
+                            cpPtr,
+                            tracker.lockWaitDuration(),
+                            tracker.lockHoldDuration(),
+                            cpPages.size(),
+                            curr.reason)
+                        );
+
+                return new Checkpoint(cpEntry, cpPages, curr);
+            }
+            else {
+                if (printCheckpointStats) {
+                    if (log.isInfoEnabled())
+                        LT.info(log, String.format("Skipping checkpoint (no pages were modified) [" +
+                            "checkpointLockWait=%dms, checkpointLockHoldTime=%dms, reason='%s']",
+                            tracker.lockWaitDuration(),
+                            tracker.lockHoldDuration(),
+                            curr.reason));
+                }
+
+                return new Checkpoint(null, null, curr);
+            }
+        }
+
+        /**
+         * @return tuple with collections of FullPageIds obtained from each PageMemory and overall number of dirty pages.
+         */
+        private IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> beginAllCheckpoints() {
+            Collection<GridMultiCollectionWrapper<FullPageId>> res = new ArrayList(memoryPolicies().size());
+
+            int pagesNum = 0;
+
+            for (MemoryPolicy memPlc : memoryPolicies()) {
+                GridMultiCollectionWrapper<FullPageId> nextCpPagesCol = ((PageMemoryEx) memPlc.pageMemory()).beginCheckpoint();
+
+                pagesNum += nextCpPagesCol.size();
+
+                res.add(nextCpPagesCol);
+            }
+
+            return new IgniteBiTuple<>(res, pagesNum);
+        }
+
+        /**
+         * @param chp Checkpoint snapshot.
+         */
+        private void markCheckpointEnd(Checkpoint chp) throws IgniteCheckedException {
+            synchronized (this) {
+                for (MemoryPolicy memPlc : memoryPolicies())
+                    ((PageMemoryEx)memPlc.pageMemory()).finishCheckpoint();
+
+                if (chp.hasDelta())
+                    writeCheckpointEntry(
+                        tmpWriteBuf,
+                        chp.cpEntry.checkpointTimestamp(),
+                        chp.cpEntry.checkpointId(),
+                        chp.cpEntry.checkpointMark(),
+                        null,
+                        CheckpointEntryType.END);
+            }
+
+            checkpointHist.onCheckpointFinished(chp);
+
+            chp.progress.cpFinishFut.onDone();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void cancel() {
+            if (log.isDebugEnabled())
+                log.debug("Cancelling grid runnable: " + this);
+
+            // Do not interrupt runner thread.
+            isCancelled = true;
+
+            synchronized (this) {
+                notifyAll();
+            }
+        }
+
+        /**
+         *
+         */
+        public void shutdownNow() {
+            shutdownNow = true;
+
+            if (!isCancelled)
+                cancel();
+        }
+    }
+
+    /**
+     *
+     */
+    private class WriteCheckpointPages implements Runnable {
+        /** */
+        private CheckpointMetricsTracker tracker;
+
+        /** */
+        private Collection<FullPageId> writePageIds;
+
+        /** */
+        private GridConcurrentHashSet<PageStore> updStores;
+
+        /** */
+        private CountDownFuture doneFut;
+
+        /**
+         * @param writePageIds Write page IDs.
+         */
+        private WriteCheckpointPages(
+            CheckpointMetricsTracker tracker,
+            Collection<FullPageId> writePageIds,
+            GridConcurrentHashSet<PageStore> updStores,
+            CountDownFuture doneFut
+        ) {
+            this.tracker = tracker;
+            this.writePageIds = writePageIds;
+            this.updStores = updStores;
+            this.doneFut = doneFut;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            ByteBuffer tmpWriteBuf = threadBuf.get();
+
+            long writeAddr = GridUnsafe.bufferAddress(tmpWriteBuf);
+
+            snapshotMgr.beforeCheckpointPageWritten();
+
+            try {
+                for (FullPageId fullId : writePageIds) {
+                    if (checkpointer.shutdownNow)
+                        break;
+
+                    tmpWriteBuf.rewind();
+
+                    snapshotMgr.beforePageWrite(fullId);
+
+                    int grpId = fullId.cacheId();
+
+                    CacheGroupContext grp = context().cache().cacheGroup(grpId);
+
+                    if (grp == null)
+                        continue;
+
+                    PageMemoryEx pageMem = (PageMemoryEx)grp.memoryPolicy().pageMemory();
+
+                    Integer tag = pageMem.getForCheckpoint(fullId, tmpWriteBuf, persStoreMetrics.metricsEnabled() ? tracker : null);
+
+                    if (tag != null) {
+                        tmpWriteBuf.rewind();
+
+                        if (persStoreMetrics.metricsEnabled()) {
+                            int pageType = PageIO.getType(tmpWriteBuf);
+
+                            if (PageIO.isDataPageType(pageType))
+                                tracker.onDataPageWritten();
+                        }
+
+                        if (!skipCrc) {
+                            PageIO.setCrc(writeAddr, PureJavaCrc32.calcCrc32(tmpWriteBuf, pageSize()));
+
+                            tmpWriteBuf.rewind();
+                        }
+
+                        snapshotMgr.onPageWrite(fullId, tmpWriteBuf);
+
+                        tmpWriteBuf.rewind();
+
+                        PageIO.setCrc(writeAddr, 0);
+
+          

<TRUNCATED>

Mime
View raw message