ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject [1/5] ignite git commit: IGNITE-6204 Backport optimizations of checkpointing algorithm into 2.2
Date Fri, 15 Sep 2017 08:30:25 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-2.2 bdaeecca9 -> 42c336827


IGNITE-6204 Backport optimizations of checkpointing algorithm into 2.2

(cherry picked from commit 737874f)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8ad99680
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8ad99680
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8ad99680

Branch: refs/heads/ignite-2.2
Commit: 8ad996802c8a9898fd22fdf74bd1c97120f985ed
Parents: bdaeecc
Author: Ivan Rakov <ivan.glukos@gmail.com>
Authored: Tue Aug 29 13:24:20 2017 +0300
Committer: Anton Vinogradov <av@apache.org>
Committed: Fri Sep 15 11:26:45 2017 +0300

----------------------------------------------------------------------
 .../configuration/CheckpointWriteOrder.java     |  33 ++++++
 .../PersistentStoreConfiguration.java           |  31 ++++-
 .../internal/pagemem/store/PageStore.java       |   5 +
 .../GridCacheDatabaseSharedManager.java         | 106 +++++++++++++----
 .../cache/persistence/file/FilePageStore.java   |  56 +++++----
 .../persistence/file/FilePageStoreFactory.java  |  35 ++++++
 .../persistence/file/FilePageStoreManager.java  |  17 +--
 .../cache/persistence/file/FilePageStoreV2.java |  53 +++++++++
 .../file/FileVersionCheckingFactory.java        | 116 +++++++++++++++++++
 ...gnitePdsRecoveryAfterFileCorruptionTest.java |   2 +-
 ...nitePersistenceSequentialCheckpointTest.java |  44 +++++++
 .../IgnitePersistentStoreCacheGroupsTest.java   |  31 ++---
 12 files changed, 461 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8ad99680/modules/core/src/main/java/org/apache/ignite/configuration/CheckpointWriteOrder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CheckpointWriteOrder.java
b/modules/core/src/main/java/org/apache/ignite/configuration/CheckpointWriteOrder.java
new file mode 100644
index 0000000..31feaf6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CheckpointWriteOrder.java
@@ -0,0 +1,33 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.configuration;
+
+/**
+ * This enum defines order of writing pages to disk storage during checkpoint.
+ */
+public enum CheckpointWriteOrder {
+    /**
+     * Pages are written in order provided by checkpoint pages collection iterator (which
is basically a hashtable).
+     */
+    RANDOM,
+
+    /**
+     * All checkpoint pages are collected into single list and sorted by page index.
+     * Provides almost sequential disk writes, which can be much faster on some SSD models.
+     */
+    SEQUENTIAL
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8ad99680/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
index e8a0ff4..888bf42 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
@@ -16,12 +16,11 @@
  */
 package org.apache.ignite.configuration;
 
+import java.io.Serializable;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
 import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
-import java.io.Serializable;
-
 /**
  * Configures Apache Ignite Persistent store.
  */
@@ -45,7 +44,10 @@ public class PersistentStoreConfiguration implements Serializable {
     public static final int DFLT_RATE_TIME_INTERVAL_MILLIS = 60_000;
 
     /** Default number of checkpointing threads. */
-    public static final int DFLT_CHECKPOINTING_THREADS = 1;
+    public static final int DFLT_CHECKPOINTING_THREADS = 4;
+
+    /** Default checkpoint write order. */
+    public static final CheckpointWriteOrder DFLT_CHECKPOINT_WRITE_ORDER = CheckpointWriteOrder.SEQUENTIAL;
 
     /** Default number of checkpoints to be kept in WAL after checkpoint is finished */
     public static final int DFLT_WAL_HISTORY_SIZE = 20;
@@ -95,6 +97,9 @@ public class PersistentStoreConfiguration implements Serializable {
     /** */
     private int checkpointingThreads = DFLT_CHECKPOINTING_THREADS;
 
+    /** Checkpoint write order. */
+    private CheckpointWriteOrder checkpointWriteOrder = DFLT_CHECKPOINT_WRITE_ORDER;
+
     /** Number of checkpoints to keep */
     private int walHistSize = DFLT_WAL_HISTORY_SIZE;
 
@@ -587,6 +592,26 @@ public class PersistentStoreConfiguration implements Serializable {
         return walAutoArchiveAfterInactivity;
     }
 
+    /**
+     * This property defines order of writing pages to disk storage during checkpoint.
+     *
+     * @return Checkpoint write order.
+     */
+    public CheckpointWriteOrder getCheckpointWriteOrder() {
+        return checkpointWriteOrder;
+    }
+
+    /**
+     * This property defines order of writing pages to disk storage during checkpoint.
+     *
+     * @param checkpointWriteOrder Checkpoint write order.
+     */
+    public PersistentStoreConfiguration setCheckpointWriteOrder(CheckpointWriteOrder checkpointWriteOrder)
{
+        this.checkpointWriteOrder = checkpointWriteOrder;
+
+        return this;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(PersistentStoreConfiguration.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/8ad99680/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java
index 4698a6b..f6e577c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java
@@ -95,4 +95,9 @@ public interface PageStore {
      * @throws IgniteCheckedException If sync failed (IO error occurred).
      */
     public void ensure() throws IgniteCheckedException;
+
+    /**
+     * @return Page store version.
+     */
+    public int version();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8ad99680/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index d147f36..351ec71 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -49,7 +49,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -65,6 +64,7 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.PersistenceMetrics;
+import org.apache.ignite.configuration.CheckpointWriteOrder;
 import org.apache.ignite.configuration.DataPageEvictionMode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.MemoryConfiguration;
@@ -137,6 +137,7 @@ import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.mxbean.PersistenceMetricsMXBean;
 import org.apache.ignite.thread.IgniteThread;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -384,11 +385,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         long cpBufSize = persistenceCfg.getCheckpointingPageBufferSize();
 
         if (persistenceCfg.getCheckpointingThreads() > 1)
-            asyncRunner = new ThreadPoolExecutor(
+            asyncRunner = new IgniteThreadPoolExecutor(
+                "checkpoint-runner",
+                cctx.igniteInstanceName(),
                 persistenceCfg.getCheckpointingThreads(),
                 persistenceCfg.getCheckpointingThreads(),
-                30L,
-                TimeUnit.SECONDS,
+                30_000,
                 new LinkedBlockingQueue<Runnable>()
             );
 
@@ -1301,8 +1303,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         boolean apply = status.needRestoreMemory();
 
         if (apply) {
-            U.quietAndWarn(log, "Ignite node crashed in the middle of checkpoint. Will restore
memory state and " +
-                "enforce checkpoint on node start.");
+            U.quietAndWarn(log, "Ignite node stopped in the middle of checkpoint. Will restore
memory state and " +
+                "finish checkpoint on node start.");
 
             cctx.pageStore().beginRecover();
         }
@@ -2082,12 +2084,14 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
             WALPointer cpPtr = null;
 
-            GridMultiCollectionWrapper<FullPageId> cpPages;
-
             final CheckpointProgress curr;
 
+            IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>,
Integer> cpPagesTuple;
+
             tracker.onLockWaitStart();
 
+            boolean hasPages;
+
             checkpointLock.writeLock().lock();
 
             try {
@@ -2150,19 +2154,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                 if (curr.nextSnapshot)
                     snapshotMgr.onMarkCheckPointBegin(curr.snapshotOperation, map);
 
-                IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>,
Integer> tup = beginAllCheckpoints();
-
-                // Todo it maybe more optimally
-                Collection<FullPageId> cpPagesList = new ArrayList<>(tup.get2());
-
-                for (GridMultiCollectionWrapper<FullPageId> col : tup.get1()) {
-                    for (int i = 0; i < col.collectionsSize(); i++)
-                        cpPagesList.addAll(col.innerCollection(i));
-                }
+                cpPagesTuple = beginAllCheckpoints();
 
-                cpPages = new GridMultiCollectionWrapper<>(cpPagesList);
+                hasPages = hasPageForWrite(cpPagesTuple.get1());
 
-                if (!F.isEmpty(cpPages)) {
+                if (hasPages) {
                     // No page updates for this checkpoint are allowed from now on.
                     cpPtr = cctx.wal().log(cpRec);
 
@@ -2178,7 +2174,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
             curr.cpBeginFut.onDone();
 
-            if (!F.isEmpty(cpPages)) {
+            if (hasPages) {
                 assert cpPtr != null;
 
                 // Sync log outside the checkpoint write lock.
@@ -2196,6 +2192,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
                 checkpointHist.addCheckpointEntry(cpEntry);
 
+                GridMultiCollectionWrapper<FullPageId> cpPages = splitAndSortCpPagesIfNeeded(cpPagesTuple);
+
                 if (printCheckpointStats)
                     if (log.isInfoEnabled())
                         log.info(String.format("Checkpoint started [checkpointId=%s, startPtr=%s,
checkpointLockWait=%dms, " +
@@ -2227,6 +2225,24 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         }
 
         /**
+         * Check that at least one collection is not empty.
+         *
+         * @param cpPagesCollWrapper Collection of {@link GridMultiCollectionWrapper} checkpoint
pages.
+         */
+        private boolean hasPageForWrite(Collection<GridMultiCollectionWrapper<FullPageId>>
cpPagesCollWrapper) {
+            boolean hasPages = false;
+
+            for (Collection c : cpPagesCollWrapper)
+                if (!c.isEmpty()) {
+                    hasPages = true;
+
+                    break;
+                }
+
+            return hasPages;
+        }
+
+        /**
          * @return tuple with collections of FullPageIds obtained from each PageMemory and
overall number of dirty pages.
          */
         private IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>,
Integer> beginAllCheckpoints() {
@@ -2293,6 +2309,56 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         }
     }
 
+    /**
+     * Reorders list of checkpoint pages and splits them into needed number of sublists according
to
+     * {@link PersistentStoreConfiguration#getCheckpointingThreads()} and
+     * {@link PersistentStoreConfiguration#getCheckpointWriteOrder()}.
+     *
+     * @param cpPagesTuple Checkpoint pages tuple.
+     */
+    private GridMultiCollectionWrapper<FullPageId> splitAndSortCpPagesIfNeeded(
+        IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>,
Integer> cpPagesTuple
+    ) {
+        List<FullPageId> cpPagesList = new ArrayList<>(cpPagesTuple.get2());
+
+        for (GridMultiCollectionWrapper<FullPageId> col : cpPagesTuple.get1()) {
+            for (int i = 0; i < col.collectionsSize(); i++)
+                cpPagesList.addAll(col.innerCollection(i));
+        }
+
+        if (persistenceCfg.getCheckpointWriteOrder() == CheckpointWriteOrder.SEQUENTIAL)
{
+            Collections.sort(cpPagesList, new Comparator<FullPageId>() {
+                @Override public int compare(FullPageId o1, FullPageId o2) {
+                    int cmp = Long.compare(o1.groupId(), o2.groupId());
+                    if (cmp != 0)
+                        return cmp;
+
+                    return Long.compare(PageIdUtils.effectivePageId(o1.pageId()),
+                        PageIdUtils.effectivePageId(o2.pageId()));
+                }
+            });
+        }
+
+        int cpThreads = persistenceCfg.getCheckpointingThreads();
+
+        int pagesSubLists = cpThreads == 1 ? 1 : cpThreads * 4;
+        // Splitting pages to (threads * 4) subtasks. If any thread will be faster, it will
help slower threads.
+
+        Collection[] pagesSubListArr = new Collection[pagesSubLists];
+
+        for (int i = 0; i < pagesSubLists; i++) {
+            int totalSize = cpPagesList.size();
+
+            int from = totalSize * i / (pagesSubLists);
+
+            int to = totalSize * (i + 1) / (pagesSubLists);
+
+            pagesSubListArr[i] = cpPagesList.subList(from, to);
+        }
+
+        return new GridMultiCollectionWrapper<FullPageId>(pagesSubListArr);
+    }
+
     /** Pages write task */
     private class WriteCheckpointPages implements Runnable {
         /** */

http://git-wip-us.apache.org/repos/asf/ignite/blob/8ad99680/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
index a7ca13c..e6c5379 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
@@ -45,10 +45,10 @@ public class FilePageStore implements PageStore {
     private static final long SIGNATURE = 0xF19AC4FE60C530B8L;
 
     /** File version. */
-    private static final int VERSION = 1;
+    public static final int VERSION = 1;
 
     /** Allocated field offset. */
-    public static final int HEADER_SIZE = 8/*SIGNATURE*/ + 4/*VERSION*/ + 1/*type*/ + 4/*page
size*/;
+    static final int HEADER_SIZE = 8/*SIGNATURE*/ + 4/*VERSION*/ + 1/*type*/ + 4/*page size*/;
 
     /** */
     private final File cfgFile;
@@ -57,7 +57,7 @@ public class FilePageStore implements PageStore {
     private final byte type;
 
     /** Database configuration. */
-    private final MemoryConfiguration dbCfg;
+    protected final MemoryConfiguration dbCfg;
 
     /** Factory to provide I/O interfaces for read/write operations with files */
     private final FileIOFactory ioFactory;
@@ -103,20 +103,36 @@ public class FilePageStore implements PageStore {
 
     /** {@inheritDoc} */
     @Override public boolean exists() {
-        return cfgFile.exists() && cfgFile.length() > HEADER_SIZE;
+        return cfgFile.exists() && cfgFile.length() > headerSize();
     }
 
     /**
+     * Size of page store header.
+     */
+    public int headerSize() {
+        return HEADER_SIZE;
+    }
+
+    /**
+     * Page store version.
+     */
+    public int version() {
+        return VERSION;
+    }
+
+    /**
+     * Creates header for current version file store. Doesn't init the store.
+     *
      * @param type Type.
      * @param pageSize Page size.
      * @return Byte buffer instance.
      */
-    public static ByteBuffer header(byte type, int pageSize) {
-        ByteBuffer hdr = ByteBuffer.allocate(HEADER_SIZE).order(ByteOrder.LITTLE_ENDIAN);
+    public ByteBuffer header(byte type, int pageSize) {
+        ByteBuffer hdr = ByteBuffer.allocate(headerSize()).order(ByteOrder.LITTLE_ENDIAN);
 
         hdr.putLong(SIGNATURE);
 
-        hdr.putInt(VERSION);
+        hdr.putInt(version());
 
         hdr.put(type);
 
@@ -142,7 +158,7 @@ public class FilePageStore implements PageStore {
         }
 
         //there is 'super' page in every file
-        return HEADER_SIZE + dbCfg.getPageSize();
+        return headerSize() + dbCfg.getPageSize();
     }
 
     /**
@@ -150,7 +166,7 @@ public class FilePageStore implements PageStore {
      */
     private long checkFile() throws IgniteCheckedException {
         try {
-            ByteBuffer hdr = ByteBuffer.allocate(HEADER_SIZE).order(ByteOrder.LITTLE_ENDIAN);
+            ByteBuffer hdr = ByteBuffer.allocate(headerSize()).order(ByteOrder.LITTLE_ENDIAN);
 
             while (hdr.remaining() > 0)
                 fileIO.read(hdr);
@@ -166,9 +182,9 @@ public class FilePageStore implements PageStore {
 
             int ver = hdr.getInt();
 
-            if (VERSION != ver)
+            if (version() != ver)
                 throw new IgniteCheckedException("Failed to verify store file (invalid file
version)" +
-                    " [expectedVersion=" + VERSION +
+                    " [expectedVersion=" + version() +
                     ", fileVersion=" + ver + "]");
 
             byte type = hdr.get();
@@ -187,10 +203,10 @@ public class FilePageStore implements PageStore {
 
             long fileSize = cfgFile.length();
 
-            if (fileSize == HEADER_SIZE) // Every file has a special meta page.
-                fileSize = pageSize + HEADER_SIZE;
+            if (fileSize == headerSize()) // Every file has a special meta page.
+                fileSize = pageSize + headerSize();
 
-            if ((fileSize - HEADER_SIZE) % pageSize != 0)
+            if ((fileSize - headerSize()) % pageSize != 0)
                 throw new IgniteCheckedException("Failed to verify store file (invalid file
size)" +
                     " [fileSize=" + U.hexLong(fileSize) +
                     ", pageSize=" + U.hexLong(pageSize) + ']');
@@ -346,9 +362,9 @@ public class FilePageStore implements PageStore {
         init();
 
         try {
-            assert buf.remaining() == HEADER_SIZE;
+            assert buf.remaining() == headerSize();
 
-            int len = HEADER_SIZE;
+            int len = headerSize();
 
             long off = 0;
 
@@ -425,7 +441,7 @@ public class FilePageStore implements PageStore {
 
             long off = pageOffset(pageId);
 
-            assert (off >= 0 && off + pageSize <= allocated.get() + HEADER_SIZE)
|| recover :
+            assert (off >= 0 && off + pageSize <= allocated.get() + headerSize())
|| recover :
                 "off=" + U.hexLong(off) + ", allocated=" + U.hexLong(allocated.get()) + ",
pageId=" + U.hexLong(pageId);
 
             assert pageBuf.capacity() == pageSize;
@@ -463,7 +479,7 @@ public class FilePageStore implements PageStore {
 
     /** {@inheritDoc} */
     @Override public long pageOffset(long pageId) {
-        return (long) PageIdUtils.pageIndex(pageId) * pageSize + HEADER_SIZE;
+        return (long) PageIdUtils.pageIndex(pageId) * pageSize + headerSize();
     }
 
     /** {@inheritDoc} */
@@ -494,7 +510,7 @@ public class FilePageStore implements PageStore {
 
         long off = allocPage();
 
-        return off / pageSize;
+        return (off - headerSize()) / pageSize;
     }
 
     /**
@@ -519,6 +535,6 @@ public class FilePageStore implements PageStore {
         if (!inited)
             return 0;
 
-        return (int)(allocated.get() / pageSize);
+        return (int)(allocated.get() - headerSize()) / pageSize;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8ad99680/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreFactory.java
new file mode 100644
index 0000000..d97ab26
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreFactory.java
@@ -0,0 +1,35 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.ignite.internal.processors.cache.persistence.file;
+
+import java.io.File;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageIdAllocator;
+
+/**
+ *
+ */
+public interface FilePageStoreFactory {
+    /**
+     * Creates instance of FilePageStore based on given file.
+     *
+     * @param type Data type, can be {@link PageIdAllocator#FLAG_IDX} or {@link PageIdAllocator#FLAG_DATA}.
+     * @param file File Page store file.
+     */
+    public FilePageStore createPageStore(byte type, File file) throws IgniteCheckedException;
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8ad99680/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
index e2ad070..0041ea6 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -365,21 +365,16 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter
implemen
         if (dirExisted && !idxFile.exists())
             grpsWithoutIdx.add(grpDesc.groupId());
 
-        FilePageStore idxStore = new FilePageStore(
-            PageMemory.FLAG_IDX,
-            idxFile,
-            pstCfg.getFileIOFactory(),
-            cctx.kernalContext().config().getMemoryConfiguration());
+        FileVersionCheckingFactory pageStoreFactory = new FileVersionCheckingFactory(
+            pstCfg.getFileIOFactory(), igniteCfg.getMemoryConfiguration());
+
+        FilePageStore idxStore = pageStoreFactory.createPageStore(PageMemory.FLAG_IDX, idxFile);
 
         FilePageStore[] partStores = new FilePageStore[grpDesc.config().getAffinity().partitions()];
 
         for (int partId = 0; partId < partStores.length; partId++) {
-            FilePageStore partStore = new FilePageStore(
-                PageMemory.FLAG_DATA,
-                new File(cacheWorkDir, String.format(PART_FILE_TEMPLATE, partId)),
-                pstCfg.getFileIOFactory(),
-                cctx.kernalContext().config().getMemoryConfiguration()
-            );
+            FilePageStore partStore = pageStoreFactory.createPageStore(
+                PageMemory.FLAG_DATA, new File(cacheWorkDir, String.format(PART_FILE_TEMPLATE,
partId)));
 
             partStores[partId] = partStore;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8ad99680/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreV2.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreV2.java
new file mode 100644
index 0000000..5d044ec
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreV2.java
@@ -0,0 +1,53 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.internal.processors.cache.persistence.file;
+
+import java.io.File;
+import org.apache.ignite.configuration.MemoryConfiguration;
+
+/**
+ *
+ */
+public class FilePageStoreV2 extends FilePageStore {
+    /** File version. */
+    public static final int VERSION = 2;
+
+    /** Header size. */
+    private final int hdrSize;
+
+    /**
+     * @param type Type.
+     * @param file File.
+     * @param factory Factory.
+     * @param cfg Config.
+     */
+    public FilePageStoreV2(byte type, File file, FileIOFactory factory, MemoryConfiguration
cfg) {
+        super(type, file, factory, cfg);
+
+        hdrSize = cfg.getPageSize();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int headerSize() {
+        return hdrSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int version() {
+        return VERSION;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8ad99680/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java
new file mode 100644
index 0000000..53bd802
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java
@@ -0,0 +1,116 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.ignite.internal.processors.cache.persistence.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.MemoryConfiguration;
+
+/**
+ * Checks version in files if it's present on the disk, creates store with latest version
otherwise.
+ */
+public class FileVersionCheckingFactory implements FilePageStoreFactory {
+    /** Property to override latest version. Should be used only in tests. */
+    public static final String LATEST_VERSION_OVERRIDE_PROPERTY = "file.page.store.latest.version.override";
+
+    /** Latest page store version. */
+    public final static int LATEST_VERSION = 2;
+
+    /** Factory to provide I/O interfaces for read/write operations with files. */
+    private final FileIOFactory fileIOFactory;
+
+    /** Memory configuration. */
+    private final MemoryConfiguration memCfg;
+
+    /**
+     * @param fileIOFactory File io factory.
+     * @param memCfg Memory configuration.
+     */
+    public FileVersionCheckingFactory(
+        FileIOFactory fileIOFactory, MemoryConfiguration memCfg) {
+        this.fileIOFactory = fileIOFactory;
+        this.memCfg = memCfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override public FilePageStore createPageStore(byte type, File file) throws IgniteCheckedException
{
+        if (!file.exists())
+            return createPageStore(type, file, latestVersion());
+
+        try (FileIO fileIO = fileIOFactory.create(file, "r")) {
+            int minHdr = FilePageStore.HEADER_SIZE;
+
+            if (fileIO.size() < minHdr)
+                return createPageStore(type, file, latestVersion());
+
+            ByteBuffer hdr = ByteBuffer.allocate(minHdr).order(ByteOrder.LITTLE_ENDIAN);
+
+            while (hdr.remaining() > 0)
+                fileIO.read(hdr);
+
+            hdr.rewind();
+
+            hdr.getLong(); // Read signature
+
+            int ver = hdr.getInt();
+
+            return createPageStore(type, file, ver);
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException("Error while creating file page store [file="
+ file + "]:", e);
+        }
+    }
+
+    /**
+     * Resolves latest page store version.
+     */
+    public int latestVersion() {
+        int latestVer = LATEST_VERSION;
+
+        try {
+            latestVer = Integer.parseInt(System.getProperty(LATEST_VERSION_OVERRIDE_PROPERTY));
+        } catch (NumberFormatException e) {
+            // No override.
+        }
+
+        return latestVer;
+    }
+
+    /**
+     * Instantiates specific version of FilePageStore.
+     *
+     * @param type Type.
+     * @param file File.
+     * @param ver Version.
+     */
+    public FilePageStore createPageStore(byte type, File file, int ver) throws IgniteCheckedException
{
+        switch (ver) {
+            case FilePageStore.VERSION:
+                return new FilePageStore(type, file, fileIOFactory, memCfg);
+
+            case FilePageStoreV2.VERSION:
+                return new FilePageStoreV2(type, file, fileIOFactory, memCfg);
+
+            default:
+                throw new IllegalArgumentException("Unknown version of file page store: "
+ ver);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8ad99680/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java
index c248c35..11d5eef 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java
@@ -194,7 +194,7 @@ public class IgnitePdsRecoveryAfterFileCorruptionTest extends GridCommonAbstract
 
         long size = fileIO.size();
 
-        fileIO.write(ByteBuffer.allocate((int)size - FilePageStore.HEADER_SIZE), FilePageStore.HEADER_SIZE);
+        fileIO.write(ByteBuffer.allocate((int)size - filePageStore.headerSize()), filePageStore.headerSize());
 
         fileIO.force();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8ad99680/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java
new file mode 100644
index 0000000..9295000
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java
@@ -0,0 +1,44 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import org.apache.ignite.configuration.CheckpointWriteOrder;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
+import org.apache.ignite.configuration.WALMode;
+
+/**
+ *
+ */
+public class IgnitePersistenceSequentialCheckpointTest extends IgnitePersistentStoreCacheGroupsTest
{
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setPersistentStoreConfiguration(new PersistentStoreConfiguration()
+            .setWalMode(WALMode.LOG_ONLY)
+            .setCheckpointingThreads(4)
+            .setCheckpointWriteOrder(CheckpointWriteOrder.SEQUENTIAL));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int entriesCount() {
+        return 1000;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8ad99680/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreCacheGroupsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreCacheGroupsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreCacheGroupsTest.java
index a945c73..b39b8cb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreCacheGroupsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreCacheGroupsTest.java
@@ -87,7 +87,7 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
 
         MemoryConfiguration memCfg = new MemoryConfiguration();
         memCfg.setPageSize(1024);
-        memCfg.setDefaultMemoryPolicySize(10 * 1024 * 1024);
+        memCfg.setDefaultMemoryPolicySize(100 * 1024 * 1024);
 
         cfg.setMemoryConfiguration(memCfg);
 
@@ -115,6 +115,11 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
         super.afterTest();
     }
 
+    /** Entries count. */
+    protected int entriesCount() {
+        return 10;
+    }
+
     /**
      * @throws Exception If failed.
      */
@@ -236,7 +241,7 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
         for (String cacheName : caches) {
             IgniteCache<Object, Object> cache = node.cache(cacheName).withExpiryPolicy(plc);
 
-            for (int i = 0; i < 10; i++)
+            for (int i = 0; i < entriesCount(); i++)
                 cache.put(i, cacheName + i);
         }
 
@@ -253,10 +258,10 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
         for (String cacheName : caches) {
             IgniteCache<Object, Object> cache = node.cache(cacheName);
 
-            for (int i = 0; i < 10; i++)
+            for (int i = 0; i < entriesCount(); i++)
                 assertEquals(cacheName + i, cache.get(i));
 
-            assertEquals(10, cache.size());
+            assertEquals(entriesCount(), cache.size());
         }
 
         // Wait for expiration.
@@ -340,7 +345,7 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
         for (String cacheName : caches) {
             IgniteCache<Object, Object> cache = node.cache(cacheName);
 
-            for (int i = 0; i < 10; i++)
+            for (int i = 0; i < entriesCount(); i++)
                 cache.put(i, new Person("" + i, cacheName));
         }
     }
@@ -353,10 +358,10 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
         for (String cacheName : caches) {
             IgniteCache<Object, Object> cache = node.cache(cacheName);
 
-            for (int i = 0; i < 10; i++)
+            for (int i = 0; i < entriesCount(); i++)
                 assertEquals(new Person("" + i, cacheName), cache.get(i));
 
-            assertEquals(10, cache.size());
+            assertEquals(entriesCount(), cache.size());
         }
     }
 
@@ -373,10 +378,10 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
 
             List<Cache.Entry<Integer, Person>> persons = cache.query(qry.setArgs(cacheName)).getAll();
 
-            for (int i = 0; i < 10; i++)
+            for (int i = 0; i < entriesCount(); i++)
                 assertEquals(new Person("" + i, cacheName), persons.get(i).getValue());
 
-            assertEquals(10, persons.size());
+            assertEquals(entriesCount(), persons.size());
         }
     }
 
@@ -413,13 +418,13 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
         for (String cacheName : caches) {
             IgniteCache<Object, Object> cache = node.cache(cacheName);
 
-            for (int i = 0; i < 10; i++)  {
+            for (int i = 0; i < entriesCount(); i++)  {
                 cache.put(i, cacheName + i);
 
                 assertEquals(cacheName + i, cache.get(i));
             }
 
-            assertEquals(10, cache.size());
+            assertEquals(entriesCount(), cache.size());
         }
 
         stopAllGrids();
@@ -433,10 +438,10 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
         for (String cacheName : caches) {
             IgniteCache<Object, Object> cache = node.cache(cacheName);
 
-            for (int i = 0; i < 10; i++)
+            for (int i = 0; i < entriesCount(); i++)
                 assertEquals(cacheName + i, cache.get(i));
 
-            assertEquals(10, cache.size());
+            assertEquals(entriesCount(), cache.size());
         }
     }
 


Mime
View raw message