ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [36/47] ignite git commit: IGNITE-5267 - Moved ignite-ps module to ignite-core
Date Sun, 11 Jun 2017 20:04:02 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/6bf5ce46/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/RandomLruPageEvictionTracker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/RandomLruPageEvictionTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/RandomLruPageEvictionTracker.java
new file mode 100644
index 0000000..035a91a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/RandomLruPageEvictionTracker.java
@@ -0,0 +1,159 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.ignite.internal.processors.cache.persistence.evict;
+
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.configuration.MemoryPolicyConfiguration;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.pagemem.impl.PageMemoryNoStoreImpl;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.typedef.internal.LT;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *
+ */
+public class RandomLruPageEvictionTracker extends PageAbstractEvictionTracker {
+    /** Evict attempts limit. */
+    private static final int EVICT_ATTEMPTS_LIMIT = 30;
+
+    /** LRU Sample size. */
+    private static final int SAMPLE_SIZE = 5;
+
+    /** Maximum sample search spin count */
+    private static final int SAMPLE_SPIN_LIMIT = SAMPLE_SIZE * 1000;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Tracking array ptr. */
+    private long trackingArrPtr;
+
+    /**
+     * @param pageMem Page memory.
+     * @param plcCfg Policy config.
+     * @param sharedCtx Shared context.
+     */
+    public RandomLruPageEvictionTracker(
+        PageMemory pageMem,
+        MemoryPolicyConfiguration plcCfg,
+        GridCacheSharedContext<?, ?> sharedCtx
+    ) {
+        super((PageMemoryNoStoreImpl)pageMem, plcCfg, sharedCtx);
+
+        MemoryConfiguration memCfg = sharedCtx.kernalContext().config().getMemoryConfiguration();
+
+        assert plcCfg.getMaxSize() / memCfg.getPageSize() < Integer.MAX_VALUE;
+
+        log = sharedCtx.logger(getClass());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteException {
+        trackingArrPtr = GridUnsafe.allocateMemory(trackingSize * 4);
+
+        GridUnsafe.setMemory(trackingArrPtr, trackingSize * 4, (byte)0);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() throws IgniteException {
+        GridUnsafe.freeMemory(trackingArrPtr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void touchPage(long pageId) throws IgniteCheckedException {
+        int pageIdx = PageIdUtils.pageIndex(pageId);
+
+        long res = compactTimestamp(U.currentTimeMillis());
+        
+        assert res >= 0 && res < Integer.MAX_VALUE;
+        
+        GridUnsafe.putIntVolatile(null, trackingArrPtr + trackingIdx(pageIdx) * 4, (int)res);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void evictDataPage() throws IgniteCheckedException {
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        int evictAttemptsCnt = 0;
+
+        while (evictAttemptsCnt < EVICT_ATTEMPTS_LIMIT) {
+            int lruTrackingIdx = -1;
+
+            int lruCompactTs = Integer.MAX_VALUE;
+
+            int dataPagesCnt = 0;
+
+            int sampleSpinCnt = 0;
+
+            while (dataPagesCnt < SAMPLE_SIZE) {
+                int sampleTrackingIdx = rnd.nextInt(trackingSize);
+
+                int compactTs = GridUnsafe.getIntVolatile(null, trackingArrPtr + sampleTrackingIdx * 4);
+
+                if (compactTs != 0) {
+                    // We chose data page with at least one touch.
+                    if (compactTs < lruCompactTs) {
+                        lruTrackingIdx = sampleTrackingIdx;
+
+                        lruCompactTs = compactTs;
+                    }
+
+                    dataPagesCnt++;
+                }
+
+                sampleSpinCnt++;
+
+                if (sampleSpinCnt > SAMPLE_SPIN_LIMIT) {
+                    LT.warn(log, "Too many attempts to choose data page: " + SAMPLE_SPIN_LIMIT);
+
+                    return;
+                }
+            }
+
+            if (evictDataPage(pageIdx(lruTrackingIdx)))
+                return;
+
+            evictAttemptsCnt++;
+        }
+
+        LT.warn(log, "Too many failed attempts to evict page: " + EVICT_ATTEMPTS_LIMIT);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean checkTouch(long pageId) {
+        int trackingIdx = trackingIdx(PageIdUtils.pageIndex(pageId));
+
+        int ts = GridUnsafe.getIntVolatile(null, trackingArrPtr + trackingIdx * 4);
+
+        return ts != 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void forgetPage(long pageId) {
+        int pageIdx = PageIdUtils.pageIndex(pageId);
+
+        GridUnsafe.putIntVolatile(null, trackingArrPtr + trackingIdx(pageIdx) * 4, 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6bf5ce46/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
new file mode 100755
index 0000000..6ddc9fc
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.pagemem.store.PageStore;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
+import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC;
+
+/**
+ * File page store.
+ */
+public class FilePageStore implements PageStore {
+    /** Page store file signature. */
+    private static final long SIGNATURE = 0xF19AC4FE60C530B8L;
+
+    /** File version. */
+    private static final int VERSION = 1;
+
+    /** Allocated field offset. */
+    public static final int HEADER_SIZE = 8/*SIGNATURE*/ + 4/*VERSION*/ + 1/*type*/ + 4/*page size*/;
+
+    /** */
+    private final File cfgFile;
+
+    /** */
+    private final byte type;
+
+    /** Database configuration. */
+    private final MemoryConfiguration dbCfg;
+
+    /** */
+    private RandomAccessFile file;
+
+    /** */
+    private FileChannel ch;
+
+    /** */
+    private final AtomicLong allocated;
+
+    /** */
+    private final int pageSize;
+
+    /** */
+    private volatile boolean inited;
+
+    /** */
+    private volatile boolean recover;
+
+    /** */
+    private volatile int tag;
+
+    /** */
+    private boolean skipCrc = IgniteSystemProperties.getBoolean(IGNITE_PDS_SKIP_CRC, false);
+
+    /** */
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    /**
+     * @param file File.
+     */
+    public FilePageStore(byte type, File file, MemoryConfiguration cfg) {
+        this.type = type;
+
+        cfgFile = file;
+        dbCfg = cfg;
+
+        allocated = new AtomicLong();
+
+        pageSize = dbCfg.getPageSize();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean exists() {
+        return cfgFile.exists() && cfgFile.length() > HEADER_SIZE;
+    }
+
+    /**
+     * @param type Type.
+     * @param pageSize Page size.
+     * @return Byte buffer instance.
+     */
+    public static ByteBuffer header(byte type, int pageSize) {
+        ByteBuffer hdr = ByteBuffer.allocate(HEADER_SIZE).order(ByteOrder.LITTLE_ENDIAN);
+
+        hdr.putLong(SIGNATURE);
+
+        hdr.putInt(VERSION);
+
+        hdr.put(type);
+
+        hdr.putInt(pageSize);
+
+        hdr.rewind();
+
+        return hdr;
+    }
+
+    /**
+     *
+     */
+    private long initFile() {
+        try {
+            ByteBuffer hdr = header(type, dbCfg.getPageSize());
+
+            while (hdr.remaining() > 0)
+                ch.write(hdr);
+        }
+        catch (IOException e) {
+            throw new IgniteException("Check file failed.", e);
+        }
+
+        //there is 'super' page in every file
+        return HEADER_SIZE + dbCfg.getPageSize();
+    }
+
+    /**
+     *
+     */
+    private long checkFile() throws IgniteCheckedException {
+        try {
+            ByteBuffer hdr = ByteBuffer.allocate(HEADER_SIZE).order(ByteOrder.LITTLE_ENDIAN);
+
+            while (hdr.remaining() > 0)
+                ch.read(hdr);
+
+            hdr.rewind();
+
+            long signature = hdr.getLong();
+
+            if (SIGNATURE != signature)
+                throw new IgniteCheckedException("Failed to verify store file (invalid file signature)" +
+                    " [expectedSignature=" + U.hexLong(SIGNATURE) +
+                    ", actualSignature=" + U.hexLong(signature) + ']');
+
+            int ver = hdr.getInt();
+
+            if (VERSION != ver)
+                throw new IgniteCheckedException("Failed to verify store file (invalid file version)" +
+                    " [expectedVersion=" + VERSION +
+                    ", fileVersion=" + ver + "]");
+
+            byte type = hdr.get();
+
+            if (this.type != type)
+                throw new IgniteCheckedException("Failed to verify store file (invalid file type)" +
+                    " [expectedFileType=" + this.type +
+                    ", actualFileType=" + type + "]");
+
+            int pageSize = hdr.getInt();
+
+            if (dbCfg.getPageSize() != pageSize)
+                throw new IgniteCheckedException("Failed to verify store file (invalid page size)" +
+                    " [expectedPageSize=" + dbCfg.getPageSize() +
+                    ", filePageSize=" + pageSize + "]");
+
+            long fileSize = file.length();
+
+            if (fileSize == HEADER_SIZE) // Every file has a special meta page.
+                fileSize = pageSize + HEADER_SIZE;
+
+            if ((fileSize - HEADER_SIZE) % pageSize != 0)
+                throw new IgniteCheckedException("Failed to verify store file (invalid file size)" +
+                    " [fileSize=" + U.hexLong(fileSize) +
+                    ", pageSize=" + U.hexLong(pageSize) + ']');
+
+            return fileSize;
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException("File check failed", e);
+        }
+    }
+
+    /**
+     * @param cleanFile {@code True} to delete file.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void stop(boolean cleanFile) throws IgniteCheckedException {
+        lock.writeLock().lock();
+
+        try {
+            if (!inited)
+                return;
+
+            ch.force(false);
+
+            file.close();
+
+            if (cleanFile)
+                cfgFile.delete();
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /**
+     *
+     */
+    public void truncate(int tag) throws IgniteCheckedException {
+        lock.writeLock().lock();
+
+        try {
+            if (!inited)
+                return;
+
+            this.tag = tag;
+
+            ch.position(0);
+
+            file.setLength(0);
+
+            allocated.set(initFile());
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /**
+     *
+     */
+    public void beginRecover() {
+        lock.writeLock().lock();
+
+        try {
+            recover = true;
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /**
+     *
+     */
+    public void finishRecover() {
+        lock.writeLock().lock();
+
+        try {
+            if (inited)
+                allocated.set(ch.size());
+
+            recover = false;
+        }
+        catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void read(long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteCheckedException {
+        init();
+
+        try {
+            long off = pageOffset(pageId);
+
+            assert pageBuf.capacity() == pageSize;
+            assert pageBuf.position() == 0;
+            assert pageBuf.order() == ByteOrder.nativeOrder();
+
+            int len = pageSize;
+
+            do {
+                int n = ch.read(pageBuf, off);
+
+                // If page was not written yet, nothing to read.
+                if (n < 0) {
+                    pageBuf.put(new byte[pageBuf.remaining()]);
+
+                    return;
+                }
+
+                off += n;
+
+                len -= n;
+            }
+            while (len > 0);
+
+            int savedCrc32 = PageIO.getCrc(pageBuf);
+
+            PageIO.setCrc(pageBuf, 0);
+
+            pageBuf.position(0);
+
+            if (!skipCrc) {
+                int curCrc32 = PureJavaCrc32.calcCrc32(pageBuf, pageSize);
+
+                if ((savedCrc32 ^ curCrc32) != 0)
+                    throw new IgniteDataIntegrityViolationException("Failed to read page (CRC validation failed) " +
+                        "[id=" + U.hexLong(pageId) + ", off=" + (off - pageSize) +
+                        ", file=" + cfgFile.getAbsolutePath() + ", fileSize=" + ch.size() +
+                        ", savedCrc=" + U.hexInt(savedCrc32) + ", curCrc=" + U.hexInt(curCrc32) + "]");
+            }
+
+            assert PageIO.getCrc(pageBuf) == 0;
+
+            if (keepCrc)
+                PageIO.setCrc(pageBuf, savedCrc32);
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException("Read error", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readHeader(ByteBuffer buf) throws IgniteCheckedException {
+        init();
+
+        try {
+            assert buf.remaining() == HEADER_SIZE;
+
+            int len = HEADER_SIZE;
+
+            long off = 0;
+
+            do {
+                int n = ch.read(buf, off);
+
+                // If page was not written yet, nothing to read.
+                if (n < 0)
+                    return;
+
+                off += n;
+
+                len -= n;
+            }
+            while (len > 0);
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException("Read error", e);
+        }
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed to initialize store file.
+     */
+    private void init() throws IgniteCheckedException {
+        if (!inited) {
+            lock.writeLock().lock();
+
+            try {
+                if (!inited) {
+                    RandomAccessFile rndFile = null;
+
+                    IgniteCheckedException err = null;
+
+                    try {
+                        file = rndFile = new RandomAccessFile(cfgFile, "rw");
+
+                        ch = file.getChannel();
+
+                        if (file.length() == 0)
+                            allocated.set(initFile());
+                        else
+                            allocated.set(checkFile());
+
+                        inited = true;
+                    }
+                    catch (IOException e) {
+                        throw err = new IgniteCheckedException("Can't open file: " + cfgFile.getName(), e);
+                    }
+                    finally {
+                        if (err != null && rndFile != null)
+                            try {
+                                rndFile.close();
+                            }
+                            catch (IOException e) {
+                                err.addSuppressed(e);
+                            }
+                    }
+                }
+            }
+            finally {
+                lock.writeLock().unlock();
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(long pageId, ByteBuffer pageBuf, int tag) throws IgniteCheckedException {
+        init();
+
+        lock.readLock().lock();
+
+        try {
+            if (tag < this.tag)
+                return;
+
+            long off = pageOffset(pageId);
+
+            assert (off >= 0 && off + pageSize <= allocated.get() + HEADER_SIZE) || recover :
+                "off=" + U.hexLong(off) + ", allocated=" + U.hexLong(allocated.get()) + ", pageId=" + U.hexLong(pageId);
+
+            assert pageBuf.capacity() == pageSize;
+            assert pageBuf.position() == 0;
+            assert pageBuf.order() == ByteOrder.nativeOrder();
+            assert PageIO.getCrc(pageBuf) == 0 : U.hexLong(pageId);
+
+            int crc32 = skipCrc ? 0 : PureJavaCrc32.calcCrc32(pageBuf, pageSize);
+
+            PageIO.setCrc(pageBuf, crc32);
+
+            pageBuf.position(0);
+
+            int len = pageSize;
+
+            do {
+                int n = ch.write(pageBuf, off);
+
+                off += n;
+
+                len -= n;
+            }
+            while (len > 0);
+
+            PageIO.setCrc(pageBuf, 0);
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException("Failed to write the page to the file store [pageId=" + pageId +
+                ", file=" + cfgFile.getAbsolutePath() + ']', e);
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public long pageOffset(long pageId) {
+        return (long) PageIdUtils.pageIndex(pageId) * pageSize + HEADER_SIZE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void sync() throws IgniteCheckedException {
+        lock.writeLock().lock();
+
+        try {
+            init();
+
+            ch.force(false);
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException("Sync error", e);
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void ensure() throws IgniteCheckedException {
+        init();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long allocatePage() throws IgniteCheckedException {
+        init();
+
+        long off = allocPage();
+
+        return off / pageSize;
+    }
+
+    /**
+     *
+     */
+    private long allocPage() {
+        long off;
+
+        do {
+            off = allocated.get();
+
+            if (allocated.compareAndSet(off, off + pageSize))
+                break;
+        }
+        while (true);
+
+        return off;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int pages() {
+        if (!inited)
+            return 0;
+
+        return (int)(allocated.get() / pageSize);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6bf5ce46/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
new file mode 100755
index 0000000..f908512
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -0,0 +1,671 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.file;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.pagemem.PageIdAllocator;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
+import org.apache.ignite.internal.pagemem.store.PageStore;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
+import org.apache.ignite.internal.processors.cache.StoredCacheData;
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheSnapshotManager;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * File page store manager.
+ */
+public class FilePageStoreManager extends GridCacheSharedManagerAdapter implements IgnitePageStoreManager {
+    /** File suffix. */
+    public static final String FILE_SUFFIX = ".bin";
+
+    /** Partition file prefix. */
+    public static final String PART_FILE_PREFIX = "part-";
+
+    /** */
+    public static final String INDEX_FILE_NAME = "index" + FILE_SUFFIX;
+
+    /** */
+    public static final String PART_FILE_TEMPLATE = PART_FILE_PREFIX+ "%d" + FILE_SUFFIX;
+
+    /** */
+    public static final String CACHE_DIR_PREFIX = "cache-";
+
+    /** */
+    public static final String CACHE_GRP_DIR_PREFIX = "cacheGroup-";
+
+    /** */
+    public static final String CACHE_DATA_FILENAME = "cache_data.dat";
+
+    /** Marshaller. */
+    private static final Marshaller marshaller = new JdkMarshaller();
+
+    /** */
+    private final Map<Integer, CacheStoreHolder> idxCacheStores = new ConcurrentHashMap<>();
+
+    /** */
+    private final IgniteConfiguration igniteCfg;
+
+    /** */
+    private PersistentStoreConfiguration pstCfg;
+
+    /** Absolute directory for file page store */
+    private File storeWorkDir;
+
+    /** */
+    private final long metaPageId = PageIdUtils.pageId(-1, PageMemory.FLAG_IDX, 0);
+
+    /** */
+    private final Set<Integer> grpsWithoutIdx = Collections.newSetFromMap(new ConcurrentHashMap<Integer, Boolean>());
+
+    /**
+     * @param ctx Kernal context.
+     */
+    public FilePageStoreManager(GridKernalContext ctx) {
+        igniteCfg = ctx.config();
+
+        PersistentStoreConfiguration pstCfg = igniteCfg.getPersistentStoreConfiguration();
+
+        assert pstCfg != null : "WAL should not be created if persistence is disabled.";
+
+        this.pstCfg = pstCfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start0() throws IgniteCheckedException {
+        if (cctx.kernalContext().clientNode())
+            return;
+
+        String consId = U.maskForFileName(cctx.kernalContext().discovery().consistentId().toString());
+
+        if (pstCfg.getPersistentStorePath() != null) {
+            File workDir0 = new File(pstCfg.getPersistentStorePath());
+
+            if (!workDir0.isAbsolute())
+                workDir0 = U.resolveWorkDirectory(
+                    igniteCfg.getWorkDirectory(),
+                    pstCfg.getPersistentStorePath(),
+                    false
+                );
+
+            storeWorkDir = new File(workDir0, consId);
+        }
+        else
+            storeWorkDir = new File(U.resolveWorkDirectory(
+                igniteCfg.getWorkDirectory(),
+                "db",
+                false
+            ), consId);
+
+        U.ensureDirectory(storeWorkDir, "page store work directory", log);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop0(boolean cancel) {
+        if (log.isDebugEnabled())
+            log.debug("Stopping page store manager.");
+
+        IgniteCheckedException ex = shutdown(false);
+
+        if (ex != null)
+            U.error(log, "Failed to gracefully stop page store manager", ex);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException {
+        if (log.isDebugEnabled())
+            log.debug("Activate page store manager [id=" + cctx.localNodeId() +
+                " topVer=" + cctx.discovery().topologyVersionEx() + " ]");
+
+        start0();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException {
+        if (log.isDebugEnabled())
+            log.debug("DeActivate page store manager [id=" + cctx.localNodeId() +
+                " topVer=" + cctx.discovery().topologyVersionEx() + " ]");
+
+        stop0(true);
+
+        idxCacheStores.clear();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void beginRecover() {
+        for (CacheStoreHolder holder : idxCacheStores.values()) {
+            holder.idxStore.beginRecover();
+
+            for (FilePageStore partStore : holder.partStores)
+                partStore.beginRecover();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void finishRecover() {
+        for (CacheStoreHolder holder : idxCacheStores.values()) {
+            holder.idxStore.finishRecover();
+
+            for (FilePageStore partStore : holder.partStores)
+                partStore.finishRecover();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void initializeForCache(CacheGroupDescriptor grpDesc, StoredCacheData cacheData)
+        throws IgniteCheckedException {
+        int grpId = grpDesc.groupId();
+
+        if (!idxCacheStores.containsKey(grpId)) {
+            CacheStoreHolder holder = initForCache(grpDesc, cacheData.config());
+
+            CacheStoreHolder old = idxCacheStores.put(grpId, holder);
+
+            assert old == null : "Non-null old store holder for cache: " + cacheData.config().getName();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void storeCacheData(
+        CacheGroupDescriptor grpDesc,
+        StoredCacheData cacheData
+    ) throws IgniteCheckedException {
+
+        File cacheWorkDir = cacheWorkDirectory(grpDesc, cacheData.config());
+        File file;
+
+        checkAndInitCacheWorkDir(cacheWorkDir);
+
+        assert cacheWorkDir.exists() : "Work directory does not exist: " + cacheWorkDir;
+
+        if (grpDesc.sharedGroup())
+            file = new File(cacheWorkDir, cacheData.config().getName() + CACHE_DATA_FILENAME);
+        else
+            file = new File(cacheWorkDir, CACHE_DATA_FILENAME);
+
+        if (!file.exists() || file.length() == 0) {
+            try {
+                file.createNewFile();
+
+                try (OutputStream stream = new BufferedOutputStream(new FileOutputStream(file))) {
+                    marshaller.marshal(cacheData, stream);
+                }
+            }
+            catch (IOException ex) {
+                throw new IgniteCheckedException("Failed to persist cache configuration: " + cacheData.config().getName(), ex);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void shutdownForCacheGroup(CacheGroupContext grp, boolean destroy) throws IgniteCheckedException {
+        grpsWithoutIdx.remove(grp.groupId());
+
+        CacheStoreHolder old = idxCacheStores.remove(grp.groupId());
+
+        assert old != null : "Missing cache store holder [cache=" + grp.cacheOrGroupName() +
+            ", locNodeId=" + cctx.localNodeId() + ", gridName=" + cctx.igniteInstanceName() + ']';
+
+        IgniteCheckedException ex = shutdown(old, /*clean files if destroy*/destroy, null);
+
+        if (ex != null)
+            throw ex;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onPartitionCreated(int grpId, int partId) throws IgniteCheckedException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onPartitionDestroyed(int grpId, int partId, int tag) throws IgniteCheckedException {
+        assert partId <= PageIdAllocator.MAX_PARTITION_ID;
+
+        PageStore store = getStore(grpId, partId);
+
+        assert store instanceof FilePageStore : store;
+
+        ((FilePageStore)store).truncate(tag);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void read(int cacheId, long pageId, ByteBuffer pageBuf) throws IgniteCheckedException {
+        read(cacheId, pageId, pageBuf, false);
+    }
+
+    /**
+     * Will preserve crc in buffer if keepCrc is true.
+     *
+     * @param cacheId Cache ID.
+     * @param pageId Page ID.
+     * @param pageBuf Page buffer.
+     * @param keepCrc Keep CRC flag.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void read(int cacheId, long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteCheckedException {
+        PageStore store = getStore(cacheId, PageIdUtils.partId(pageId));
+
+        store.read(pageId, pageBuf, keepCrc);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean exists(int grpId, int partId) throws IgniteCheckedException {
+        PageStore store = getStore(grpId, partId);
+
+        return store.exists();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readHeader(int cacheId, int partId, ByteBuffer buf) throws IgniteCheckedException {
+        PageStore store = getStore(cacheId, partId);
+
+        store.readHeader(buf);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(int cacheId, long pageId, ByteBuffer pageBuf,int tag) throws IgniteCheckedException {
+        writeInternal(cacheId, pageId, pageBuf, tag);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long pageOffset(int cacheId, long pageId) throws IgniteCheckedException {
+        PageStore store = getStore(cacheId, PageIdUtils.partId(pageId));
+
+        return store.pageOffset(pageId);
+    }
+
+    /**
+     * @param cacheId Cache ID to write.
+     * @param pageId Page ID.
+     * @param pageBuf Page buffer.
+     * @return PageStore to which the page has been written.
+     * @throws IgniteCheckedException If IO error occurred.
+     */
+    public PageStore writeInternal(int cacheId, long pageId, ByteBuffer pageBuf, int tag) throws IgniteCheckedException {
+        int partId = PageIdUtils.partId(pageId);
+
+        PageStore store = getStore(cacheId, partId);
+
+        store.write(pageId, pageBuf, tag);
+
+        return store;
+    }
+
+    /**
+     * @param grpDesc Cache group descriptor.
+     * @param ccfg Cache configuration.
+     * @return Cache work directory.
+     */
+    private File cacheWorkDirectory(CacheGroupDescriptor grpDesc, CacheConfiguration ccfg) {
+        String dirName;
+
+        if (grpDesc.sharedGroup())
+            dirName = CACHE_GRP_DIR_PREFIX + ccfg.getGroupName();
+        else
+            dirName = CACHE_DIR_PREFIX + ccfg.getName();
+
+        return new File(storeWorkDir, dirName);
+    }
+
+    /**
+     * @param grpDesc Cache group descriptor.
+     * @param ccfg Cache configuration.
+     * @return Cache store holder.
+     * @throws IgniteCheckedException If failed.
+     */
+    private CacheStoreHolder initForCache(CacheGroupDescriptor grpDesc, CacheConfiguration ccfg) throws IgniteCheckedException {
+        assert !grpDesc.sharedGroup() || ccfg.getGroupName() != null : ccfg.getName();
+
+        File cacheWorkDir = cacheWorkDirectory(grpDesc, ccfg);
+
+        boolean dirExisted = checkAndInitCacheWorkDir(cacheWorkDir);
+
+        File idxFile = new File(cacheWorkDir, INDEX_FILE_NAME);
+
+        if (dirExisted && !idxFile.exists())
+            grpsWithoutIdx.add(grpDesc.groupId());
+
+        FilePageStore idxStore = new FilePageStore(
+            PageMemory.FLAG_IDX,
+            idxFile,
+            cctx.kernalContext().config().getMemoryConfiguration());
+
+        FilePageStore[] partStores = new FilePageStore[grpDesc.config().getAffinity().partitions()];
+
+        for (int partId = 0; partId < partStores.length; partId++) {
+            FilePageStore partStore = new FilePageStore(
+                PageMemory.FLAG_DATA,
+                new File(cacheWorkDir, String.format(PART_FILE_TEMPLATE, partId)),
+                cctx.kernalContext().config().getMemoryConfiguration()
+            );
+
+            partStores[partId] = partStore;
+        }
+
+        return new CacheStoreHolder(idxStore, partStores);
+    }
+
+    private boolean checkAndInitCacheWorkDir(File cacheWorkDir) throws IgniteCheckedException {
+        boolean dirExisted = false;
+
+        if (!cacheWorkDir.exists()) {
+            boolean res = cacheWorkDir.mkdirs();
+
+            if (!res)
+                throw new IgniteCheckedException("Failed to initialize cache working directory " +
+                    "(failed to create, make sure the work folder has correct permissions): " +
+                    cacheWorkDir.getAbsolutePath());
+        }
+        else {
+            if (cacheWorkDir.isFile())
+                throw new IgniteCheckedException("Failed to initialize cache working directory " +
+                    "(a file with the same name already exists): " + cacheWorkDir.getAbsolutePath());
+
+            File lockF = new File(cacheWorkDir, IgniteCacheSnapshotManager.SNAPSHOT_RESTORE_STARTED_LOCK_FILENAME);
+
+            if (lockF.exists()) {
+                Path tmp = cacheWorkDir.toPath().getParent().resolve(cacheWorkDir.getName() + ".tmp");
+
+                boolean deleted = U.delete(cacheWorkDir);
+
+                if (Files.exists(tmp) && Files.isDirectory(tmp)) {
+                    U.warn(log, "Ignite node crashed during the snapshot restore process " +
+                        "(there is a snapshot restore lock file left for cache). But old version of cache was saved. " +
+                        "Trying to restore it. Cache - [" + cacheWorkDir.getAbsolutePath() + ']');
+
+                    try {
+                        Files.move(tmp, cacheWorkDir.toPath());
+                    }
+                    catch (IOException e) {
+                        throw new IgniteCheckedException(e);
+                    }
+                }
+                else {
+                    U.warn(log, "Ignite node crashed during the snapshot restore process " +
+                        "(there is a snapshot restore lock file left for cache). Will remove both the lock file and " +
+                        "incomplete cache directory [cacheDir=" + cacheWorkDir.getAbsolutePath() + ']');
+
+                    if (!deleted)
+                        throw new IgniteCheckedException("Failed to remove obsolete cache working directory " +
+                            "(remove the directory manually and make sure the work folder has correct permissions): " +
+                            cacheWorkDir.getAbsolutePath());
+
+                    cacheWorkDir.mkdirs();
+                }
+
+                if (!cacheWorkDir.exists())
+                    throw new IgniteCheckedException("Failed to initialize cache working directory " +
+                        "(failed to create, make sure the work folder has correct permissions): " +
+                        cacheWorkDir.getAbsolutePath());
+            }
+            else
+                dirExisted = true;
+        }
+
+        return dirExisted;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void sync(int cacheId, int partId) throws IgniteCheckedException {
+        getStore(cacheId, partId).sync();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void ensure(int cacheId, int partId) throws IgniteCheckedException {
+        getStore(cacheId, partId).ensure();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long allocatePage(int cacheId, int partId, byte flags) throws IgniteCheckedException {
+        assert partId <= PageIdAllocator.MAX_PARTITION_ID || partId == PageIdAllocator.INDEX_PARTITION;
+
+        PageStore store = getStore(cacheId, partId);
+
+        long pageIdx = store.allocatePage();
+
+        return PageIdUtils.pageId(partId, flags, (int)pageIdx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long metaPageId(final int cacheId) {
+        return metaPageId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int pages(int cacheId, int partId) throws IgniteCheckedException {
+        PageStore store = getStore(cacheId, partId);
+
+        return store.pages();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<String, StoredCacheData> readCacheConfigurations() throws IgniteCheckedException {
+        if (cctx.kernalContext().clientNode())
+            return Collections.emptyMap();
+
+        File[] files = storeWorkDir.listFiles();
+
+        if (files == null)
+            return Collections.emptyMap();
+
+        Map<String, StoredCacheData> ccfgs = new HashMap<>();
+
+        for (File file : files) {
+            if (file.isDirectory()) {
+                if (file.getName().startsWith(CACHE_DIR_PREFIX)) {
+                    File conf = new File(file, CACHE_DATA_FILENAME);
+
+                    if (conf.exists() && conf.length() > 0) {
+                        StoredCacheData cacheData = readCacheData(conf);
+
+                        ccfgs.put(cacheData.config().getName(), cacheData);
+                    }
+                }
+                else if (file.getName().startsWith(CACHE_GRP_DIR_PREFIX))
+                    readCacheGroupCaches(file, ccfgs);
+            }
+        }
+
+        return ccfgs;
+    }
+
+    /**
+     * @param grpDir Group directory.
+     * @param ccfgs Cache configurations.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void readCacheGroupCaches(File grpDir, Map<String, StoredCacheData> ccfgs) throws IgniteCheckedException {
+        File[] files = grpDir.listFiles();
+
+        if (files == null)
+            return;
+
+        for (File file : files) {
+            if (!file.isDirectory() && file.getName().endsWith(CACHE_DATA_FILENAME) && file.length() > 0) {
+                StoredCacheData cacheData = readCacheData(file);
+
+                ccfgs.put(cacheData.config().getName(), cacheData);
+            }
+        }
+    }
+
+    /**
+     * @param conf File with stored cache data.
+     * @return Cache data.
+     * @throws IgniteCheckedException If failed.
+     */
+    private StoredCacheData readCacheData(File conf) throws IgniteCheckedException {
+        try (InputStream stream = new BufferedInputStream(new FileInputStream(conf))) {
+            return marshaller.unmarshal(stream, U.resolveClassLoader(igniteCfg));
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException("Failed to read cache configuration from disk for cache: " +
+                conf.getAbsolutePath(), e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasIndexStore(int grpId) {
+        return !grpsWithoutIdx.contains(grpId);
+    }
+
+    /**
+     * @return Store work dir.
+     */
+    public File workDir() {
+        return storeWorkDir;
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @return Store dir for given cache.
+     */
+    public File cacheWorkDir(CacheConfiguration ccfg) {
+        String dirName = ccfg.getGroupName() == null ?
+            CACHE_DIR_PREFIX + ccfg.getName() : CACHE_GRP_DIR_PREFIX + ccfg.getGroupName();
+
+        return new File(storeWorkDir, dirName);
+    }
+
+    /**
+     * @param cleanFiles {@code True} if the stores should delete it's files upon close.
+     */
+    private IgniteCheckedException shutdown(boolean cleanFiles) {
+        IgniteCheckedException ex = null;
+
+        for (CacheStoreHolder holder : idxCacheStores.values())
+            ex = shutdown(holder, cleanFiles, ex);
+
+        return ex;
+    }
+
+    /**
+     * @param holder Store holder.
+     * @param cleanFile {@code True} if files should be cleaned.
+     * @param aggr Aggregating exception.
+     * @return Aggregating exception, if error occurred.
+     */
+    private IgniteCheckedException shutdown(CacheStoreHolder holder, boolean cleanFile,
+        @Nullable IgniteCheckedException aggr) {
+        aggr = shutdown(holder.idxStore, cleanFile, aggr);
+
+        for (FilePageStore store : holder.partStores) {
+            if (store != null)
+                aggr = shutdown(store, cleanFile, aggr);
+        }
+
+        return aggr;
+    }
+
+    /**
+     * @param store Store to shutdown.
+     * @param cleanFile {@code True} if files should be cleaned.
+     * @param aggr Aggregating exception.
+     * @return Aggregating exception, if error occurred.
+     */
+    private IgniteCheckedException shutdown(FilePageStore store, boolean cleanFile, IgniteCheckedException aggr) {
+        try {
+            if (store != null)
+                store.stop(cleanFile);
+        }
+        catch (IgniteCheckedException e) {
+            if (aggr == null)
+                aggr = new IgniteCheckedException("Failed to gracefully shutdown store");
+
+            aggr.addSuppressed(e);
+        }
+
+        return aggr;
+    }
+
+    /**
+     * @param grpId Cache group ID.
+     * @param partId Partition ID.
+     * @return Page store for the corresponding parameters.
+     * @throws IgniteCheckedException If cache or partition with the given ID was not created.
+     *
+     * Note: visible for testing.
+     */
+    public PageStore getStore(int grpId, int partId) throws IgniteCheckedException {
+        CacheStoreHolder holder = idxCacheStores.get(grpId);
+
+        if (holder == null)
+            throw new IgniteCheckedException("Failed to get page store for the given cache ID " +
+                "(cache has not been started): " + grpId);
+
+        if (partId == PageIdAllocator.INDEX_PARTITION)
+            return holder.idxStore;
+
+        if (partId > PageIdAllocator.MAX_PARTITION_ID)
+            throw new IgniteCheckedException("Partition ID is reserved: " + partId);
+
+        FilePageStore store = holder.partStores[partId];
+
+        if (store == null)
+            throw new IgniteCheckedException("Failed to get page store for the given partition ID " +
+                "(partition has not been created) [grpId=" + grpId + ", partId=" + partId + ']');
+
+        return store;
+    }
+
+    /**
+     *
+     */
+    private static class CacheStoreHolder {
+        /** Index store. */
+        private final FilePageStore idxStore;
+
+        /** Partition stores. */
+        private final FilePageStore[] partStores;
+
+        /**
+         *
+         */
+        public CacheStoreHolder(FilePageStore idxStore, FilePageStore[] partStores) {
+            this.idxStore = idxStore;
+            this.partStores = partStores;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6bf5ce46/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeList.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeList.java
new file mode 100644
index 0000000..d2f0099
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeList.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.freelist;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+
+/**
+ */
+public interface FreeList {
+    /**
+     * @param row Row.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void insertDataRow(CacheDataRow row) throws IgniteCheckedException;
+
+    /**
+     * @param link Row link.
+     * @param row New row data.
+     * @return {@code True} if was able to update row.
+     * @throws IgniteCheckedException If failed.
+     */
+    public boolean updateDataRow(long link, CacheDataRow row) throws IgniteCheckedException;
+
+    /**
+     * @param link Row link.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void removeDataRowByLink(long link) throws IgniteCheckedException;
+
+    /**
+     * @param log Logger.
+     */
+    public void dumpStatistics(IgniteLogger log);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6bf5ce46/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeListImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeListImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeListImpl.java
new file mode 100644
index 0000000..4e8b8d8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeListImpl.java
@@ -0,0 +1,594 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.freelist;
+
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.pagemem.PageIdAllocator;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.pagemem.PageUtils;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertFragmentRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageRemoveRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageUpdateRecord;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.MemoryMetricsImpl;
+import org.apache.ignite.internal.processors.cache.persistence.MemoryPolicy;
+import org.apache.ignite.internal.processors.cache.persistence.evict.PageEvictionTracker;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.CacheVersionIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPagePayload;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseBag;
+import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
+import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ */
+public class FreeListImpl extends PagesList implements FreeList, ReuseList {
+    /** */
+    private static final int BUCKETS = 256; // Must be power of 2.
+
+    /** */
+    private static final int REUSE_BUCKET = BUCKETS - 1;
+
+    /** */
+    private static final Integer COMPLETE = Integer.MAX_VALUE;
+
+    /** */
+    private static final Integer FAIL_I = Integer.MIN_VALUE;
+
+    /** */
+    private static final Long FAIL_L = Long.MAX_VALUE;
+
+    /** */
+    private static final int MIN_PAGE_FREE_SPACE = 8;
+
+    /** */
+    private final int shift;
+
+    /** */
+    private final AtomicReferenceArray<Stripe[]> buckets = new AtomicReferenceArray<>(BUCKETS);
+
+    /** */
+    private final int MIN_SIZE_FOR_DATA_PAGE;
+
+    /** */
+    private final int emptyDataPagesBucket;
+
+    /** */
+    private final PageHandler<CacheDataRow, Boolean> updateRow = new UpdateRowHandler();
+
+    /** */
+    private final MemoryMetricsImpl memMetrics;
+
+    /** */
+    private final PageEvictionTracker evictionTracker;
+
+    /**
+     *
+     */
+    private final class UpdateRowHandler extends PageHandler<CacheDataRow, Boolean> {
+        @Override public Boolean run(
+            int cacheId,
+            long pageId,
+            long page,
+            long pageAddr,
+            PageIO iox,
+            Boolean walPlc,
+            CacheDataRow row,
+            int itemId)
+            throws IgniteCheckedException {
+            DataPageIO io = (DataPageIO)iox;
+
+            int rowSize = getRowSize(row);
+
+            boolean updated = io.updateRow(pageAddr, itemId, pageSize(), null, row, rowSize);
+
+            evictionTracker.touchPage(pageId);
+
+            if (updated && needWalDeltaRecord(pageId, page, walPlc)) {
+                // TODO This record must contain only a reference to a logical WAL record with the actual data.
+                byte[] payload = new byte[rowSize];
+
+                DataPagePayload data = io.readPayload(pageAddr, itemId, pageSize());
+
+                assert data.payloadSize() == rowSize;
+
+                PageUtils.getBytes(pageAddr, data.offset(), payload, 0, rowSize);
+
+                wal.log(new DataPageUpdateRecord(
+                    cacheId,
+                    pageId,
+                    itemId,
+                    payload));
+            }
+
+            return updated;
+        }
+    }
+
+    /** */
+    private final PageHandler<CacheDataRow, Integer> writeRow = new WriteRowHandler();
+
+    /**
+     *
+     */
+    private final class WriteRowHandler extends PageHandler<CacheDataRow, Integer> {
+        @Override public Integer run(
+            int cacheId,
+            long pageId,
+            long page,
+            long pageAddr,
+            PageIO iox,
+            Boolean walPlc,
+            CacheDataRow row,
+            int written)
+            throws IgniteCheckedException {
+            DataPageIO io = (DataPageIO)iox;
+
+            int rowSize = getRowSize(row);
+            int oldFreeSpace = io.getFreeSpace(pageAddr);
+
+            assert oldFreeSpace > 0 : oldFreeSpace;
+
+            // If the full row does not fit into this page write only a fragment.
+            written = (written == 0 && oldFreeSpace >= rowSize) ? addRow(pageId, page, pageAddr, io, row, rowSize):
+                addRowFragment(pageId, page, pageAddr, io, row, written, rowSize);
+
+            // Reread free space after update.
+            int newFreeSpace = io.getFreeSpace(pageAddr);
+
+            if (newFreeSpace > MIN_PAGE_FREE_SPACE) {
+                int bucket = bucket(newFreeSpace, false);
+
+                put(null, pageId, page, pageAddr, bucket);
+            }
+
+            if (written == rowSize)
+                evictionTracker.touchPage(pageId);
+
+            // Avoid boxing with garbage generation for usual case.
+            return written == rowSize ? COMPLETE : written;
+        }
+
+        /**
+         * @param pageId Page ID.
+         * @param page Page pointer.
+         * @param pageAddr Page address.
+         * @param io IO.
+         * @param row Row.
+         * @param rowSize Row size.
+         * @return Written size which is always equal to row size here.
+         * @throws IgniteCheckedException If failed.
+         */
+        private int addRow(
+            long pageId,
+            long page,
+            long pageAddr,
+            DataPageIO io,
+            CacheDataRow row,
+            int rowSize
+        ) throws IgniteCheckedException {
+            io.addRow(pageAddr, row, rowSize, pageSize());
+
+            if (needWalDeltaRecord(pageId, page, null)) {
+                // TODO This record must contain only a reference to a logical WAL record with the actual data.
+                byte[] payload = new byte[rowSize];
+
+                DataPagePayload data = io.readPayload(pageAddr, PageIdUtils.itemId(row.link()), pageSize());
+
+                assert data.payloadSize() == rowSize;
+
+                PageUtils.getBytes(pageAddr, data.offset(), payload, 0, rowSize);
+
+                wal.log(new DataPageInsertRecord(
+                    cacheId,
+                    pageId,
+                    payload));
+            }
+
+            return rowSize;
+        }
+
+        /**
+         * @param pageId Page ID.
+         * @param page Page pointer.
+         * @param pageAddr Page address.
+         * @param io IO.
+         * @param row Row.
+         * @param written Written size.
+         * @param rowSize Row size.
+         * @return Updated written size.
+         * @throws IgniteCheckedException If failed.
+         */
+        private int addRowFragment(
+            long pageId,
+            long page,
+            long pageAddr,
+            DataPageIO io,
+            CacheDataRow row,
+            int written,
+            int rowSize
+        ) throws IgniteCheckedException {
+            // Read last link before the fragment write, because it will be updated there.
+            long lastLink = row.link();
+
+            int payloadSize = io.addRowFragment(pageMem, pageAddr, row, written, rowSize, pageSize());
+
+            assert payloadSize > 0 : payloadSize;
+
+            if (needWalDeltaRecord(pageId, page, null)) {
+                // TODO This record must contain only a reference to a logical WAL record with the actual data.
+                byte[] payload = new byte[payloadSize];
+
+                DataPagePayload data = io.readPayload(pageAddr, PageIdUtils.itemId(row.link()), pageSize());
+
+                PageUtils.getBytes(pageAddr, data.offset(), payload, 0, payloadSize);
+
+                wal.log(new DataPageInsertFragmentRecord(cacheId, pageId, payload, lastLink));
+            }
+
+            return written + payloadSize;
+        }
+    }
+
+
+    /** */
+    private final PageHandler<Void, Long> rmvRow = new RemoveRowHandler();
+
+    /**
+     *
+     */
+    private final class RemoveRowHandler extends PageHandler<Void, Long> {
+        @Override public Long run(
+            int cacheId,
+            long pageId,
+            long page,
+            long pageAddr,
+            PageIO iox,
+            Boolean walPlc,
+            Void ignored,
+            int itemId)
+            throws IgniteCheckedException {
+            DataPageIO io = (DataPageIO)iox;
+
+            int oldFreeSpace = io.getFreeSpace(pageAddr);
+
+            assert oldFreeSpace >= 0: oldFreeSpace;
+
+            long nextLink = io.removeRow(pageAddr, itemId, pageSize());
+
+            if (needWalDeltaRecord(pageId, page, walPlc))
+                wal.log(new DataPageRemoveRecord(cacheId, pageId, itemId));
+
+            int newFreeSpace = io.getFreeSpace(pageAddr);
+
+            if (newFreeSpace > MIN_PAGE_FREE_SPACE) {
+                int newBucket = bucket(newFreeSpace, false);
+
+                if (oldFreeSpace > MIN_PAGE_FREE_SPACE) {
+                    int oldBucket = bucket(oldFreeSpace, false);
+
+                    if (oldBucket != newBucket) {
+                        // It is possible that page was concurrently taken for put, in this case put will handle bucket change.
+                        if (removeDataPage(pageId, page, pageAddr, io, oldBucket))
+                            put(null, pageId, page, pageAddr, newBucket);
+                    }
+                }
+                else
+                    put(null, pageId, page, pageAddr, newBucket);
+
+                if (io.isEmpty(pageAddr))
+                    evictionTracker.forgetPage(pageId);
+            }
+
+            // For common case boxed 0L will be cached inside of Long, so no garbage will be produced.
+            return nextLink;
+        }
+    }
+
+    /**
+     * @param cacheId Cache ID.
+     * @param name Name (for debug purpose).
+     * @param memMetrics Memory metrics.
+     * @param memPlc Memory policy.
+     * @param reuseList Reuse list or {@code null} if this free list will be a reuse list for itself.
+     * @param wal Write ahead log manager.
+     * @param metaPageId Metadata page ID.
+     * @param initNew {@code True} if new metadata should be initialized.
+     * @throws IgniteCheckedException If failed.
+     */
+    public FreeListImpl(
+        int cacheId,
+        String name,
+        MemoryMetricsImpl memMetrics,
+        MemoryPolicy memPlc,
+        ReuseList reuseList,
+        IgniteWriteAheadLogManager wal,
+        long metaPageId,
+        boolean initNew) throws IgniteCheckedException {
+        super(cacheId, name, memPlc.pageMemory(), BUCKETS, wal, metaPageId);
+        this.evictionTracker = memPlc.evictionTracker();
+        this.reuseList = reuseList == null ? this : reuseList;
+        int pageSize = pageMem.pageSize();
+
+        assert U.isPow2(pageSize) : "Page size must be a power of 2: " + pageSize;
+        assert U.isPow2(BUCKETS);
+        assert BUCKETS <= pageSize : pageSize;
+
+        // TODO this constant is used because currently we cannot reuse data pages as index pages
+        // TODO and vice-versa. It should be removed when data storage format is finalized.
+        MIN_SIZE_FOR_DATA_PAGE = pageSize - DataPageIO.MIN_DATA_PAGE_OVERHEAD;
+
+        int shift = 0;
+
+        while (pageSize > BUCKETS) {
+            shift++;
+            pageSize >>>= 1;
+        }
+
+        this.shift = shift;
+
+        this.memMetrics = memMetrics;
+
+        emptyDataPagesBucket = bucket(MIN_SIZE_FOR_DATA_PAGE, false);
+
+        init(metaPageId, initNew);
+    }
+
+    /**
+     * Calculates average fill factor over FreeListImpl instance.
+     */
+    public float fillFactor() {
+        long pageSize = pageSize();
+
+        long totalSize = 0;
+        long loadSize = 0;
+
+        for (int b = BUCKETS - 2; b > 0; b--) {
+            long bsize = pageSize - ((REUSE_BUCKET - b) << shift);
+
+            long pages = bucketsSize[b].longValue();
+
+            loadSize += pages * (pageSize - bsize);
+
+            totalSize += pages * pageSize;
+        }
+
+        return totalSize == 0 ? -1L : ((float) loadSize / totalSize);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void dumpStatistics(IgniteLogger log) {
+        long dataPages = 0;
+
+        final boolean dumpBucketsInfo = false;
+
+        for (int b = 0; b < BUCKETS; b++) {
+            long size = bucketsSize[b].longValue();
+
+            if (!isReuseBucket(b))
+                dataPages += size;
+
+            if (dumpBucketsInfo) {
+                Stripe[] stripes = getBucket(b);
+
+                boolean empty = true;
+
+                if (stripes != null) {
+                    for (Stripe stripe : stripes) {
+                        if (!stripe.empty) {
+                            empty = false;
+
+                            break;
+                        }
+                    }
+                }
+
+                log.info("Bucket [b=" + b +
+                    ", size=" + size +
+                    ", stripes=" + (stripes != null ? stripes.length : 0) +
+                    ", stripesEmpty=" + empty + ']');
+            }
+        }
+
+        if (dataPages > 0) {
+            log.info("FreeList [name=" + name +
+                ", buckets=" + BUCKETS +
+                ", dataPages=" + dataPages +
+                ", reusePages=" + bucketsSize[REUSE_BUCKET].longValue() + "]");
+        }
+    }
+
+    /**
+     * @param freeSpace Page free space.
+     * @param allowReuse {@code True} if it is allowed to get reuse bucket.
+     * @return Bucket.
+     */
+    private int bucket(int freeSpace, boolean allowReuse) {
+        assert freeSpace > 0 : freeSpace;
+
+        int bucket = freeSpace >>> shift;
+
+        assert bucket >= 0 && bucket < BUCKETS : bucket;
+
+        if (!allowReuse && isReuseBucket(bucket))
+            bucket--;
+
+        return bucket;
+    }
+
+    /**
+     * @param part Partition.
+     * @return Page ID.
+     * @throws IgniteCheckedException If failed.
+     */
+    private long allocateDataPage(int part) throws IgniteCheckedException {
+        assert part <= PageIdAllocator.MAX_PARTITION_ID;
+        assert part != PageIdAllocator.INDEX_PARTITION;
+
+        return pageMem.allocatePage(cacheId, part, PageIdAllocator.FLAG_DATA);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void insertDataRow(CacheDataRow row) throws IgniteCheckedException {
+        int rowSize = getRowSize(row);
+
+        int written = 0;
+
+        do {
+            if (written != 0)
+                memMetrics.incrementLargeEntriesPages();
+
+            int freeSpace = Math.min(MIN_SIZE_FOR_DATA_PAGE, rowSize - written);
+
+            long pageId = 0L;
+
+            if (freeSpace == MIN_SIZE_FOR_DATA_PAGE)
+                pageId = takeEmptyPage(emptyDataPagesBucket, DataPageIO.VERSIONS);
+
+            boolean reuseBucket = false;
+
+            // TODO: properly handle reuse bucket.
+            if (pageId == 0L) {
+                for (int b = bucket(freeSpace, false) + 1; b < BUCKETS - 1; b++) {
+                    pageId = takeEmptyPage(b, DataPageIO.VERSIONS);
+
+                    if (pageId != 0L) {
+                        reuseBucket = isReuseBucket(b);
+
+                        break;
+                    }
+                }
+            }
+
+            boolean allocated = pageId == 0L;
+
+            if (allocated)
+                pageId = allocateDataPage(row.partition());
+
+            DataPageIO init = reuseBucket || allocated ? DataPageIO.VERSIONS.latest() : null;
+
+            written = write(pageId, writeRow, init, row, written, FAIL_I);
+
+            assert written != FAIL_I; // We can't fail here.
+        }
+        while (written != COMPLETE);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean updateDataRow(long link, CacheDataRow row) throws IgniteCheckedException {
+        assert link != 0;
+
+        long pageId = PageIdUtils.pageId(link);
+        int itemId = PageIdUtils.itemId(link);
+
+        Boolean updated = write(pageId, updateRow, row, itemId, null);
+
+        assert updated != null; // Can't fail here.
+
+        return updated;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void removeDataRowByLink(long link) throws IgniteCheckedException {
+        assert link != 0;
+
+        long pageId = PageIdUtils.pageId(link);
+        int itemId = PageIdUtils.itemId(link);
+
+        long nextLink = write(pageId, rmvRow, itemId, FAIL_L);
+
+        assert nextLink != FAIL_L; // Can't fail here.
+
+        while (nextLink != 0L) {
+            memMetrics.decrementLargeEntriesPages();
+
+            itemId = PageIdUtils.itemId(nextLink);
+            pageId = PageIdUtils.pageId(nextLink);
+
+            nextLink = write(pageId, rmvRow, itemId, FAIL_L);
+
+            assert nextLink != FAIL_L; // Can't fail here.
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Stripe[] getBucket(int bucket) {
+        return buckets.get(bucket);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean casBucket(int bucket, Stripe[] exp, Stripe[] upd) {
+        return buckets.compareAndSet(bucket, exp, upd);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean isReuseBucket(int bucket) {
+        return bucket == REUSE_BUCKET;
+    }
+
+    /**
+     * @return Number of empty data pages in free list.
+     */
+    public int emptyDataPages() {
+        return bucketsSize[emptyDataPagesBucket].intValue();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void addForRecycle(ReuseBag bag) throws IgniteCheckedException {
+        assert reuseList == this: "not allowed to be a reuse list";
+
+        put(bag, 0, 0, 0L, REUSE_BUCKET);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long takeRecycledPage() throws IgniteCheckedException {
+        assert reuseList == this: "not allowed to be a reuse list";
+
+        return takeEmptyPage(REUSE_BUCKET, null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long recycledPagesCount() throws IgniteCheckedException {
+        assert reuseList == this: "not allowed to be a reuse list";
+
+        return storedPagesCount(REUSE_BUCKET);
+    }
+
+    /**
+     * @param row Row.
+     * @return Entry size on page.
+     * @throws IgniteCheckedException If failed.
+     */
+    private static int getRowSize(CacheDataRow row) throws IgniteCheckedException {
+        int keyLen = row.key().valueBytesLength(null);
+        int valLen = row.value().valueBytesLength(null);
+
+        return keyLen + valLen + CacheVersionIO.size(row.version(), false) + 8 + (row.cacheId() == 0 ? 0 : 4);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "FreeList [name=" + name + ']';
+    }
+}


Mime
View raw message