ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [34/43] ignite git commit: IEP-4 Baseline topology for persistent caches (Phase 1) Contributed by: Dmitriy Govorukhin <dmitriy.govorukhin@gmail.com> Dmitry Pavlov <dpavlov.spb@gmail.com> Eduard Shangareev <eduard.shangareev@gmail.com> Ily
Date Wed, 17 Jan 2018 12:23:32 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IndexStorageImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IndexStorageImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IndexStorageImpl.java
new file mode 100644
index 0000000..7daef3c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IndexStorageImpl.java
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.pagemem.PageUtils;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInnerIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusLeafIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
+import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
+import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Metadata storage.
+ */
+public class IndexStorageImpl implements IndexStorage {
+    /** Max index name length (bytes num) */
+    public static final int MAX_IDX_NAME_LEN = 255;
+
+    /** Reserved size for index name. Needed for backward compatibility. */
+    public static final int RESERVED_IDX_NAME_LEN = 768;
+
+    /** Bytes in byte. */
+    private static final int BYTE_LEN = 1;
+
+    /** Page memory. */
+    private final PageMemory pageMem;
+
+    /** Index tree. */
+    private final MetaTree metaTree;
+
+    /** Meta page reuse tree. */
+    private final ReuseList reuseList;
+
+    /** Cache group ID. */
+    private final int grpId;
+
+    /** */
+    private final int allocPartId;
+
+    /** */
+    private final byte allocSpace;
+
+    /**
+     * @param pageMem Page memory.
+     * @param wal Write ahead log manager.
+     */
+    public IndexStorageImpl(
+        final PageMemory pageMem,
+        final IgniteWriteAheadLogManager wal,
+        final AtomicLong globalRmvId,
+        final int grpId,
+        final int allocPartId,
+        final byte allocSpace,
+        final ReuseList reuseList,
+        final long rootPageId,
+        final boolean initNew
+    ) {
+        try {
+            this.pageMem = pageMem;
+            this.grpId = grpId;
+            this.allocPartId = allocPartId;
+            this.allocSpace = allocSpace;
+            this.reuseList = reuseList;
+
+            metaTree = new MetaTree(grpId, allocPartId, allocSpace, pageMem, wal, globalRmvId, rootPageId,
+                reuseList, MetaStoreInnerIO.VERSIONS, MetaStoreLeafIO.VERSIONS, initNew);
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public RootPage getOrAllocateForTree(final String idxName) throws IgniteCheckedException {
+        final MetaTree tree = metaTree;
+
+        synchronized (this) {
+            byte[] idxNameBytes = idxName.getBytes(StandardCharsets.UTF_8);
+
+            if (idxNameBytes.length > MAX_IDX_NAME_LEN)
+                throw new IllegalArgumentException("Too long encoded indexName [maxAllowed=" + MAX_IDX_NAME_LEN +
+                    ", currentLength=" + idxNameBytes.length + ", name=" + idxName + "]");
+
+            final IndexItem row = tree.findOne(new IndexItem(idxNameBytes, 0));
+
+            if (row == null) {
+                long pageId = 0;
+
+                if (reuseList != null)
+                    pageId = reuseList.takeRecycledPage();
+
+                pageId = pageId == 0 ? pageMem.allocatePage(grpId, allocPartId, allocSpace) : pageId;
+
+                tree.put(new IndexItem(idxNameBytes, pageId));
+
+                return new RootPage(new FullPageId(pageId, grpId), true);
+            }
+            else {
+                final FullPageId pageId = new FullPageId(row.pageId, grpId);
+
+                return new RootPage(pageId, false);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public RootPage dropRootPage(final String idxName)
+        throws IgniteCheckedException {
+        byte[] idxNameBytes = idxName.getBytes(StandardCharsets.UTF_8);
+
+        final IndexItem row = metaTree.remove(new IndexItem(idxNameBytes, 0));
+
+        if (row != null) {
+            if (reuseList == null)
+                pageMem.freePage(grpId, row.pageId);
+        }
+
+        return row != null ? new RootPage(new FullPageId(row.pageId, grpId), false) : null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void destroy() throws IgniteCheckedException {
+        metaTree.destroy();
+    }
+
+    /**
+     *
+     */
+    private static class MetaTree extends BPlusTree<IndexItem, IndexItem> {
+        /** */
+        private final int allocPartId;
+
+        /** */
+        private final byte allocSpace;
+
+        /**
+         * @param pageMem Page memory.
+         * @param metaPageId Meta page ID.
+         * @param reuseList Reuse list.
+         * @param innerIos Inner IOs.
+         * @param leafIos Leaf IOs.
+         * @throws IgniteCheckedException If failed.
+         */
+        private MetaTree(
+            final int cacheId,
+            final int allocPartId,
+            final byte allocSpace,
+            final PageMemory pageMem,
+            final IgniteWriteAheadLogManager wal,
+            final AtomicLong globalRmvId,
+            final long metaPageId,
+            final ReuseList reuseList,
+            final IOVersions<? extends BPlusInnerIO<IndexItem>> innerIos,
+            final IOVersions<? extends BPlusLeafIO<IndexItem>> leafIos,
+            final boolean initNew
+        ) throws IgniteCheckedException {
+            super(treeName("meta", "Meta"), cacheId, pageMem, wal, globalRmvId, metaPageId, reuseList, innerIos, leafIos);
+
+            this.allocPartId = allocPartId;
+            this.allocSpace = allocSpace;
+
+            initTree(initNew);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected long allocatePageNoReuse() throws IgniteCheckedException {
+            return pageMem.allocatePage(groupId(), allocPartId, allocSpace);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected int compare(final BPlusIO<IndexItem> io, final long pageAddr, final int idx,
+            final IndexItem row) throws IgniteCheckedException {
+            final int off = ((IndexIO)io).getOffset(pageAddr, idx);
+
+            int shift = 0;
+
+            // Compare index names.
+            final int len = PageUtils.getUnsignedByte(pageAddr, off + shift);
+
+            shift += BYTE_LEN;
+
+            for (int i = 0; i < len && i < row.idxName.length; i++) {
+                final int cmp = Byte.compare(PageUtils.getByte(pageAddr, off + i + shift), row.idxName[i]);
+
+                if (cmp != 0)
+                    return cmp;
+            }
+
+            return Integer.compare(len, row.idxName.length);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected IndexItem getRow(final BPlusIO<IndexItem> io, final long pageAddr,
+            final int idx, Object ignore) throws IgniteCheckedException {
+            return readRow(pageAddr, ((IndexIO)io).getOffset(pageAddr, idx));
+        }
+    }
+
+    /**
+     *
+     */
+    private static class IndexItem {
+        /** */
+        private byte[] idxName;
+
+        /** */
+        private long pageId;
+
+        /**
+         * @param idxName Index name.
+         * @param pageId Page ID.
+         */
+        private IndexItem(final byte[] idxName, final long pageId) {
+            this.idxName = idxName;
+            this.pageId = pageId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "I [idxName=" + new String(idxName) + ", pageId=" + U.hexLong(pageId) + ']';
+        }
+    }
+
+    /**
+     * Store row to buffer.
+     *
+     * @param pageAddr Page address.
+     * @param off Offset in buf.
+     * @param row Row to store.
+     */
+    private static void storeRow(
+        final long pageAddr,
+        int off,
+        final IndexItem row
+    ) {
+        // Index name length.
+        PageUtils.putUnsignedByte(pageAddr, off, row.idxName.length);
+        off++;
+
+        // Index name.
+        PageUtils.putBytes(pageAddr, off, row.idxName);
+        off += row.idxName.length;
+
+        // Page ID.
+        PageUtils.putLong(pageAddr, off, row.pageId);
+    }
+
+    /**
+     * Copy row data.
+     *
+     * @param dstPageAddr Destination page address.
+     * @param dstOff Destination buf offset.
+     * @param srcPageAddr Source page address.
+     * @param srcOff Src buf offset.
+     */
+    private static void storeRow(
+        final long dstPageAddr,
+        int dstOff,
+        final long srcPageAddr,
+        int srcOff
+    ) {
+        // Index name length.
+        final int len = PageUtils.getUnsignedByte(srcPageAddr, srcOff);
+        srcOff++;
+
+        PageUtils.putUnsignedByte(dstPageAddr, dstOff, len);
+        dstOff++;
+
+        PageHandler.copyMemory(srcPageAddr, srcOff, dstPageAddr, dstOff, len);
+        srcOff += len;
+        dstOff += len;
+
+        // Page ID.
+        PageUtils.putLong(dstPageAddr, dstOff, PageUtils.getLong(srcPageAddr, srcOff));
+    }
+
+    /**
+     * Read row from buffer.
+     *
+     * @param pageAddr Page address.
+     * @param off Offset.
+     * @return Read row.
+     */
+    private static IndexItem readRow(final long pageAddr, int off) {
+        // Index name length.
+        final int len = PageUtils.getUnsignedByte(pageAddr, off) & 0xFF;
+        off++;
+
+        // Index name.
+        final byte[] idxName = PageUtils.getBytes(pageAddr, off, len);
+        off += len;
+
+        // Page ID.
+        final long pageId = PageUtils.getLong(pageAddr, off);
+
+        return new IndexItem(idxName, pageId);
+    }
+
+    /**
+     *
+     */
+    private interface IndexIO {
+        /**
+         * @param pageAddr Page address.
+         * @param idx Index.
+         * @return Offset in buffer according to {@code idx}.
+         */
+        int getOffset(long pageAddr, int idx);
+    }
+
+    /**
+     *
+     */
+    public static final class MetaStoreInnerIO extends BPlusInnerIO<IndexItem> implements IndexIO {
+        /** */
+        public static final IOVersions<MetaStoreInnerIO> VERSIONS = new IOVersions<>(
+            new MetaStoreInnerIO(1)
+        );
+
+        /**
+         * @param ver Version.
+         */
+        private MetaStoreInnerIO(final int ver) {
+            // name bytes and 1 byte for length, 8 bytes pageId
+            super(T_METASTORE_INNER, ver, false, RESERVED_IDX_NAME_LEN + 1 + 8);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void storeByOffset(long pageAddr, int off, IndexItem row) throws IgniteCheckedException {
+            storeRow(pageAddr, off, row);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void store(final long dstPageAddr, final int dstIdx, final BPlusIO<IndexItem> srcIo,
+            final long srcPageAddr,
+            final int srcIdx) throws IgniteCheckedException {
+            storeRow(dstPageAddr, offset(dstIdx), srcPageAddr, ((IndexIO)srcIo).getOffset(srcPageAddr, srcIdx));
+        }
+
+        /** {@inheritDoc} */
+        @Override public IndexItem getLookupRow(final BPlusTree<IndexItem, ?> tree, final long pageAddr,
+            final int idx) throws IgniteCheckedException {
+            return readRow(pageAddr, offset(idx));
+        }
+
+        /** {@inheritDoc} */
+        @Override public int getOffset(long pageAddr, final int idx) {
+            return offset(idx);
+        }
+    }
+
+    /**
+     *
+     */
+    public static final class MetaStoreLeafIO extends BPlusLeafIO<IndexItem> implements IndexIO {
+        /** */
+        public static final IOVersions<MetaStoreLeafIO> VERSIONS = new IOVersions<>(
+            new MetaStoreLeafIO(1)
+        );
+
+        /**
+         * @param ver Version.
+         */
+        private MetaStoreLeafIO(final int ver) {
+            // 4 byte cache ID, UTF-16 symbols and 1 byte for length, 8 bytes pageId
+            super(T_METASTORE_LEAF, ver, RESERVED_IDX_NAME_LEN + 1 + 8);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void storeByOffset(long buf, int off, IndexItem row) throws IgniteCheckedException {
+            storeRow(buf, off, row);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void store(final long dstPageAddr,
+            final int dstIdx,
+            final BPlusIO<IndexItem> srcIo,
+            final long srcPageAddr,
+            final int srcIdx) throws IgniteCheckedException {
+            storeRow(dstPageAddr, offset(dstIdx), srcPageAddr, ((IndexIO)srcIo).getOffset(srcPageAddr, srcIdx));
+        }
+
+        /** {@inheritDoc} */
+        @Override public IndexItem getLookupRow(final BPlusTree<IndexItem, ?> tree,
+            final long pageAddr,
+            final int idx) throws IgniteCheckedException {
+            return readRow(pageAddr, offset(idx));
+        }
+
+        /** {@inheritDoc} */
+        @Override public int getOffset(long pageAddr, final int idx) {
+            return offset(idx);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/MetaStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/MetaStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/MetaStore.java
deleted file mode 100644
index c09ce4e..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/MetaStore.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.persistence;
-
-import org.apache.ignite.IgniteCheckedException;
-
-/**
- * Meta store.
- */
-public interface MetaStore {
-    /**
-     * Get or allocate initial page for an index.
-     *
-     * @param idxName Index name.
-     * @return {@link RootPage} that keeps pageId, allocated flag that shows whether the page
-     *      was newly allocated, and rootId that is counter which increments each time new page allocated.
-     * @throws IgniteCheckedException If failed.
-     */
-    public RootPage getOrAllocateForTree(String idxName) throws IgniteCheckedException;
-
-    /**
-     * Deallocate index page and remove from tree.
-     *
-     * @param idxName Index name.
-     * @return Root ID or -1 if no page was removed.
-     * @throws IgniteCheckedException  If failed.
-     */
-    public RootPage dropRootPage(String idxName) throws IgniteCheckedException;
-
-    /**
-     * Destroy this meta store.
-     *
-     * @throws IgniteCheckedException  If failed.
-     */
-    public void destroy() throws IgniteCheckedException;
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/MetadataStorage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/MetadataStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/MetadataStorage.java
deleted file mode 100644
index e667807..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/MetadataStorage.java
+++ /dev/null
@@ -1,420 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.persistence;
-
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.internal.pagemem.FullPageId;
-import org.apache.ignite.internal.pagemem.PageMemory;
-import org.apache.ignite.internal.pagemem.PageUtils;
-import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
-import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInnerIO;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusLeafIO;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
-import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
-import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- * Metadata storage.
- */
-public class MetadataStorage implements MetaStore {
-    /** Max index name length (bytes num) */
-    public static final int MAX_IDX_NAME_LEN = 255;
-
-    /** Reserved size for index name. Needed for backward compatibility. */
-    public static final int RESERVED_IDX_NAME_LEN = 768;
-
-    /** Bytes in byte. */
-    private static final int BYTE_LEN = 1;
-
-    /** Page memory. */
-    private final PageMemory pageMem;
-
-    /** Index tree. */
-    private final MetaTree metaTree;
-
-    /** Meta page reuse tree. */
-    private final ReuseList reuseList;
-
-    /** Cache group ID. */
-    private final int grpId;
-
-    /** */
-    private final int allocPartId;
-
-    /** */
-    private final byte allocSpace;
-
-    /**
-     * @param pageMem Page memory.
-     * @param wal Write ahead log manager.
-     */
-    public MetadataStorage(
-        final PageMemory pageMem,
-        final IgniteWriteAheadLogManager wal,
-        final AtomicLong globalRmvId,
-        final int grpId,
-        final int allocPartId,
-        final byte allocSpace,
-        final ReuseList reuseList,
-        final long rootPageId,
-        final boolean initNew
-    ) {
-        try {
-            this.pageMem = pageMem;
-            this.grpId = grpId;
-            this.allocPartId = allocPartId;
-            this.allocSpace = allocSpace;
-            this.reuseList = reuseList;
-
-            metaTree = new MetaTree(grpId, allocPartId, allocSpace, pageMem, wal, globalRmvId, rootPageId,
-                reuseList, MetaStoreInnerIO.VERSIONS, MetaStoreLeafIO.VERSIONS, initNew);
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public RootPage getOrAllocateForTree(final String idxName) throws IgniteCheckedException {
-        final MetaTree tree = metaTree;
-
-        synchronized (this) {
-            byte[] idxNameBytes = idxName.getBytes(StandardCharsets.UTF_8);
-
-            if (idxNameBytes.length > MAX_IDX_NAME_LEN)
-                throw new IllegalArgumentException("Too long encoded indexName [maxAllowed=" + MAX_IDX_NAME_LEN +
-                    ", currentLength=" + idxNameBytes.length + ", name=" + idxName + "]");
-
-            final IndexItem row = tree.findOne(new IndexItem(idxNameBytes, 0));
-
-            if (row == null) {
-                long pageId = 0;
-
-                if (reuseList != null)
-                    pageId = reuseList.takeRecycledPage();
-
-                pageId = pageId == 0 ? pageMem.allocatePage(grpId, allocPartId, allocSpace) : pageId;
-
-                tree.put(new IndexItem(idxNameBytes, pageId));
-
-                return new RootPage(new FullPageId(pageId, grpId), true);
-            }
-            else {
-                final FullPageId pageId = new FullPageId(row.pageId, grpId);
-
-                return new RootPage(pageId, false);
-            }
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public RootPage dropRootPage(final String idxName)
-        throws IgniteCheckedException {
-        byte[] idxNameBytes = idxName.getBytes(StandardCharsets.UTF_8);
-
-        final IndexItem row = metaTree.remove(new IndexItem(idxNameBytes, 0));
-
-        if (row != null) {
-            if (reuseList == null)
-                pageMem.freePage(grpId, row.pageId);
-        }
-
-        return row != null ? new RootPage(new FullPageId(row.pageId, grpId), false) : null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void destroy() throws IgniteCheckedException {
-        metaTree.destroy();
-    }
-
-    /**
-     *
-     */
-    private static class MetaTree extends BPlusTree<IndexItem, IndexItem> {
-        /** */
-        private final int allocPartId;
-
-        /** */
-        private final byte allocSpace;
-
-        /**
-         * @param pageMem Page memory.
-         * @param metaPageId Meta page ID.
-         * @param reuseList Reuse list.
-         * @param innerIos Inner IOs.
-         * @param leafIos Leaf IOs.
-         * @throws IgniteCheckedException If failed.
-         */
-        private MetaTree(
-            final int cacheId,
-            final int allocPartId,
-            final byte allocSpace,
-            final PageMemory pageMem,
-            final IgniteWriteAheadLogManager wal,
-            final AtomicLong globalRmvId,
-            final long metaPageId,
-            final ReuseList reuseList,
-            final IOVersions<? extends BPlusInnerIO<IndexItem>> innerIos,
-            final IOVersions<? extends BPlusLeafIO<IndexItem>> leafIos,
-            final boolean initNew
-        ) throws IgniteCheckedException {
-            super(treeName("meta", "Meta"), cacheId, pageMem, wal, globalRmvId, metaPageId, reuseList, innerIos, leafIos);
-
-            this.allocPartId = allocPartId;
-            this.allocSpace = allocSpace;
-
-            initTree(initNew);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected long allocatePageNoReuse() throws IgniteCheckedException {
-            return pageMem.allocatePage(groupId(), allocPartId, allocSpace);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected int compare(final BPlusIO<IndexItem> io, final long pageAddr, final int idx,
-            final IndexItem row) throws IgniteCheckedException {
-            final int off = ((IndexIO)io).getOffset(pageAddr, idx);
-
-            int shift = 0;
-
-            // Compare index names.
-            final int len = PageUtils.getUnsignedByte(pageAddr, off + shift);
-
-            shift += BYTE_LEN;
-
-            for (int i = 0; i < len && i < row.idxName.length; i++) {
-                final int cmp = Byte.compare(PageUtils.getByte(pageAddr, off + i + shift), row.idxName[i]);
-
-                if (cmp != 0)
-                    return cmp;
-            }
-
-            return Integer.compare(len, row.idxName.length);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected IndexItem getRow(final BPlusIO<IndexItem> io, final long pageAddr,
-            final int idx, Object ignore) throws IgniteCheckedException {
-            return readRow(pageAddr, ((IndexIO)io).getOffset(pageAddr, idx));
-        }
-    }
-
-    /**
-     *
-     */
-    private static class IndexItem {
-        /** */
-        private byte[] idxName;
-
-        /** */
-        private long pageId;
-
-        /**
-         * @param idxName Index name.
-         * @param pageId Page ID.
-         */
-        private IndexItem(final byte[] idxName, final long pageId) {
-            this.idxName = idxName;
-            this.pageId = pageId;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return "I [idxName=" + new String(idxName) + ", pageId=" + U.hexLong(pageId) + ']';
-        }
-    }
-
-    /**
-     * Store row to buffer.
-     *
-     * @param pageAddr Page address.
-     * @param off Offset in buf.
-     * @param row Row to store.
-     */
-    private static void storeRow(
-        final long pageAddr,
-        int off,
-        final IndexItem row
-    ) {
-        // Index name length.
-        PageUtils.putUnsignedByte(pageAddr, off, row.idxName.length);
-        off++;
-
-        // Index name.
-        PageUtils.putBytes(pageAddr, off, row.idxName);
-        off += row.idxName.length;
-
-        // Page ID.
-        PageUtils.putLong(pageAddr, off, row.pageId);
-    }
-
-    /**
-     * Copy row data.
-     *
-     * @param dstPageAddr Destination page address.
-     * @param dstOff Destination buf offset.
-     * @param srcPageAddr Source page address.
-     * @param srcOff Src buf offset.
-     */
-    private static void storeRow(
-        final long dstPageAddr,
-        int dstOff,
-        final long srcPageAddr,
-        int srcOff
-    ) {
-        // Index name length.
-        final int len = PageUtils.getUnsignedByte(srcPageAddr, srcOff);
-        srcOff++;
-
-        PageUtils.putUnsignedByte(dstPageAddr, dstOff, len);
-        dstOff++;
-
-        PageHandler.copyMemory(srcPageAddr, srcOff, dstPageAddr, dstOff, len);
-        srcOff += len;
-        dstOff += len;
-
-        // Page ID.
-        PageUtils.putLong(dstPageAddr, dstOff, PageUtils.getLong(srcPageAddr, srcOff));
-    }
-
-    /**
-     * Read row from buffer.
-     *
-     * @param pageAddr Page address.
-     * @param off Offset.
-     * @return Read row.
-     */
-    private static IndexItem readRow(final long pageAddr, int off) {
-        // Index name length.
-        final int len = PageUtils.getUnsignedByte(pageAddr, off) & 0xFF;
-        off++;
-
-        // Index name.
-        final byte[] idxName = PageUtils.getBytes(pageAddr, off, len);
-        off += len;
-
-        // Page ID.
-        final long pageId = PageUtils.getLong(pageAddr, off);
-
-        return new IndexItem(idxName, pageId);
-    }
-
-    /**
-     *
-     */
-    private interface IndexIO {
-        /**
-         * @param pageAddr Page address.
-         * @param idx Index.
-         * @return Offset in buffer according to {@code idx}.
-         */
-        int getOffset(long pageAddr, int idx);
-    }
-
-    /**
-     *
-     */
-    public static final class MetaStoreInnerIO extends BPlusInnerIO<IndexItem> implements IndexIO {
-        /** */
-        public static final IOVersions<MetaStoreInnerIO> VERSIONS = new IOVersions<>(
-            new MetaStoreInnerIO(1)
-        );
-
-        /**
-         * @param ver Version.
-         */
-        private MetaStoreInnerIO(final int ver) {
-            // name bytes and 1 byte for length, 8 bytes pageId
-            super(T_METASTORE_INNER, ver, false, RESERVED_IDX_NAME_LEN + 1 + 8);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void storeByOffset(long pageAddr, int off, IndexItem row) throws IgniteCheckedException {
-            storeRow(pageAddr, off, row);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void store(final long dstPageAddr, final int dstIdx, final BPlusIO<IndexItem> srcIo,
-            final long srcPageAddr,
-            final int srcIdx) throws IgniteCheckedException {
-            storeRow(dstPageAddr, offset(dstIdx), srcPageAddr, ((IndexIO)srcIo).getOffset(srcPageAddr, srcIdx));
-        }
-
-        /** {@inheritDoc} */
-        @Override public IndexItem getLookupRow(final BPlusTree<IndexItem, ?> tree, final long pageAddr,
-            final int idx) throws IgniteCheckedException {
-            return readRow(pageAddr, offset(idx));
-        }
-
-        /** {@inheritDoc} */
-        @Override public int getOffset(long pageAddr, final int idx) {
-            return offset(idx);
-        }
-    }
-
-    /**
-     *
-     */
-    public static final class MetaStoreLeafIO extends BPlusLeafIO<IndexItem> implements IndexIO {
-        /** */
-        public static final IOVersions<MetaStoreLeafIO> VERSIONS = new IOVersions<>(
-            new MetaStoreLeafIO(1)
-        );
-
-        /**
-         * @param ver Version.
-         */
-        private MetaStoreLeafIO(final int ver) {
-            // 4 byte cache ID, UTF-16 symbols and 1 byte for length, 8 bytes pageId
-            super(T_METASTORE_LEAF, ver, RESERVED_IDX_NAME_LEN + 1 + 8);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void storeByOffset(long buf, int off, IndexItem row) throws IgniteCheckedException {
-            storeRow(buf, off, row);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void store(final long dstPageAddr,
-            final int dstIdx,
-            final BPlusIO<IndexItem> srcIo,
-            final long srcPageAddr,
-            final int srcIdx) throws IgniteCheckedException {
-            storeRow(dstPageAddr, offset(dstIdx), srcPageAddr, ((IndexIO)srcIo).getOffset(srcPageAddr, srcIdx));
-        }
-
-        /** {@inheritDoc} */
-        @Override public IndexItem getLookupRow(final BPlusTree<IndexItem, ?> tree,
-            final long pageAddr,
-            final int idx) throws IgniteCheckedException {
-            return readRow(pageAddr, offset(idx));
-        }
-
-        /** {@inheritDoc} */
-        @Override public int getOffset(long pageAddr, final int idx) {
-            return offset(idx);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RecoveryDebug.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RecoveryDebug.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RecoveryDebug.java
new file mode 100644
index 0000000..54017b4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RecoveryDebug.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Paths;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.EnumSet;
+import java.util.TimeZone;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
+import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.READ;
+import static java.nio.file.StandardOpenOption.WRITE;
+
+/**
+ *
+ */
+public class RecoveryDebug implements AutoCloseable {
+    /** */
+    private static final ThreadLocal<SimpleDateFormat> sdf = new ThreadLocal<SimpleDateFormat>() {
+        /** {@inheritDoc} */
+        @Override protected SimpleDateFormat initialValue() {
+            SimpleDateFormat f = new SimpleDateFormat("dd-MM-yyyy-HH-mm-ss-SSS");
+
+            f.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+            return f;
+        }
+    };
+
+    /** */
+    @Nullable private final IgniteLogger log;
+
+    /** */
+    @Nullable private FileChannel fc;
+
+    /**
+     * @param constId Consistent ID.
+     */
+    public RecoveryDebug(Object constId, long time) {
+        this(constId, time, null);
+    }
+
+    /**
+     * @param constId Consistent ID.
+     * @param log Logger.
+     */
+    public RecoveryDebug(Object constId,long time, @Nullable IgniteLogger log) {
+        this.log = log;
+
+        try {
+            String workDir = U.defaultWorkDirectory();
+
+            File tmpDir = new File(workDir, "tmp");
+
+            if (!tmpDir.exists())
+                if (!tmpDir.mkdir())
+                    return;
+
+            File f = new File(tmpDir, "recovery-" +
+                sdf.get().format(new Date(time)) + "-" + constId  +".log");
+
+            f.createNewFile();
+
+            fc = FileChannel.open(Paths.get(f.getPath()), EnumSet.of(CREATE, READ, WRITE));
+        }
+        catch (IgniteCheckedException | IOException e) {
+            U.error(log, "Fail create recovery debug file.", e);
+
+            fc = null;
+        }
+    }
+
+    /**
+     * @param rec TX record to append.
+     * @return {@code this} for convenience.
+     */
+    public RecoveryDebug append(TxRecord rec) {
+        GridCacheVersion txVer = rec.nearXidVersion();
+
+        return fc == null ? this : appendFile(
+            "Tx record " + rec.state() + " " + rec.nearXidVersion() + " timestamp " + rec.timestamp()
+        );
+    }
+
+    /**
+     * @param rec Data record to append.
+     * @param unwrapKeyValue unwrap key and value flag.
+     * @return {@code this} for convenience.
+     */
+    public RecoveryDebug append(DataRecord rec, boolean unwrapKeyValue) {
+        if (fc == null)
+            return this;
+
+        append("Data record\n");
+
+        for (DataEntry dataEntry :  rec.writeEntries())
+            append("\t" + dataEntry.op() + " " + dataEntry.nearXidVersion() +
+                (unwrapKeyValue ? " " + dataEntry.key() + " " + dataEntry.value() : "") + "\n"
+            );
+
+        return this;
+    }
+
+    /**
+     * @param st Statement to append.
+     * @return {@code this} for convenience.
+     */
+    public RecoveryDebug append(Object st) {
+        return fc == null ? this : appendFile(st);
+    }
+
+    /**
+     * @param st Statement to append.
+     * @return {@code this} for convenience.
+     */
+    private RecoveryDebug appendFile(Object st) {
+        try {
+            fc.write(ByteBuffer.wrap(st.toString().getBytes()));
+        }
+        catch (IOException e) {
+            U.error(null, "Fail write to recovery dump file.", e);
+        }
+
+        return this;
+    }
+
+    /**
+     * Closes this debug insrance.
+     */
+    @Override public void close() {
+        if (fc != null) {
+            try {
+                fc.force(true);
+
+                fc.close();
+            }
+            catch (IOException e) {
+                U.error(null, "Fail close recovery dump file.", e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/Storable.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/Storable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/Storable.java
new file mode 100644
index 0000000..ae200df
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/Storable.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence;
+
+/**
+ * Simple interface for data, store in some RowStore.
+ */
+public interface Storable {
+    /**
+     * @param link Link for this row.
+     */
+    public void link(long link);
+
+    /**
+     * @return Link for this row.
+     */
+    public long link();
+
+    /**
+     * @return Partition.
+     */
+    public int partition();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileDownloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileDownloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileDownloader.java
new file mode 100644
index 0000000..3c54a49
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileDownloader.java
@@ -0,0 +1,189 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.ignite.internal.processors.cache.persistence.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.FileChannel;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgniteInClosureX;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Part of direct node to node file downloading
+ */
+public class FileDownloader {
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    private static final int CHUNK_SIZE = 1024 * 1024;
+
+    /** */
+    private final Path path;
+
+    /** */
+    private final AtomicLong size = new AtomicLong(-1);
+
+    /** */
+    private ServerSocketChannel serverChannel;
+
+    /** */
+    private volatile GridFutureAdapter<?> finishFut;
+
+    /** */
+    private final GridFutureAdapter<?> initFut = new GridFutureAdapter<>();
+
+    /**
+     *
+     */
+    public FileDownloader(IgniteLogger log, Path path) {
+        this.log = log;
+        this.path = path;
+    }
+
+    /**
+     *
+     */
+    public InetSocketAddress start() throws IgniteCheckedException {
+        try {
+            ServerSocketChannel ch = ServerSocketChannel.open();
+
+            ch.bind(null);
+
+            serverChannel = ch;
+
+            return (InetSocketAddress)ch.getLocalAddress();
+        }
+        catch (Exception ex) {
+            throw new IgniteCheckedException(ex);
+        }
+    }
+
+    /**
+     *
+     */
+    public void download(GridFutureAdapter<?> fut){
+        this.finishFut = fut;
+
+        final ServerSocketChannel ch = serverChannel;
+
+        fut.listen(new IgniteInClosureX<IgniteInternalFuture<?>>() {
+            @Override public void applyx(IgniteInternalFuture<?> future) throws IgniteCheckedException {
+                try {
+
+                    if (log != null && log.isInfoEnabled())
+                        log.info("Server socket closed " + ch.getLocalAddress());
+
+                    ch.close();
+                }
+                catch (Exception ex) {
+                    U.error(log, "Fail close socket.", ex);
+
+                    throw new IgniteCheckedException(ex);
+                }
+            }
+        });
+
+        FileChannel writeChannel = null;
+        SocketChannel readChannel = null;
+
+        try {
+            File f = new File(path.toUri().getPath());
+
+            if (f.exists())
+                f.delete();
+
+            File cacheWorkDir = f.getParentFile();
+
+            if (!cacheWorkDir.exists())
+                cacheWorkDir.mkdir();
+
+            writeChannel = FileChannel.open(path, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
+
+            initFut.onDone();
+
+            readChannel = serverChannel.accept();
+
+            long pos = 0;
+
+            long size = this.size.get();
+
+            while (size == -1 || pos < size) {
+                pos += writeChannel.transferFrom(readChannel, pos, CHUNK_SIZE);
+
+                if (size == -1)
+                    size = this.size.get();
+            }
+        }
+        catch (IOException ex) {
+            initFut.onDone(ex);
+
+            fut.onDone(ex);
+        }
+        finally {
+            try {
+                if (writeChannel != null)
+                    writeChannel.close();
+            }
+            catch (IOException ex) {
+                throw new IgniteException("Could not close file: " + path);
+            }
+
+            try {
+                if (readChannel != null)
+                    readChannel.close();
+            }
+            catch (IOException ex) {
+                throw new IgniteException("Could not close socket");
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    public void download(long size, Throwable th) {
+        try {
+            initFut.get();
+
+            if (th != null)
+                finishFut.onDone(th);
+            else {
+                if (!this.size.compareAndSet(-1, size))
+                    finishFut.onDone(new IgniteException("Size mismatch: " + this.size.get() + " != " + size));
+                else
+                    finishFut.onDone();
+            }
+
+        }
+        catch (IgniteCheckedException e) {
+            finishFut.onDone(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
index 47f1d4d..053ab2b 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
@@ -292,8 +292,10 @@ public class FilePageStore implements PageStore {
         lock.writeLock().lock();
 
         try {
+            // Since we always have a meta-page in the store, never revert allocated counter to a value smaller than
+            // header + page.
             if (inited)
-                allocated.set(fileIO.size());
+                allocated.set(Math.max(headerSize() + pageSize, fileIO.size()));
 
             recover = false;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
index 66af0dd..d406df6 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -48,11 +48,13 @@ import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
 import org.apache.ignite.internal.processors.cache.StoredCacheData;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
 import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -196,8 +198,23 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
     }
 
     /** {@inheritDoc} */
+    @Override public void initializeForMetastorage()
+        throws IgniteCheckedException {
+        int grpId = MetaStorage.METASTORAGE_CACHE_ID;
+
+        if (!idxCacheStores.containsKey(grpId)) {
+            CacheStoreHolder holder = initDir(new File(storeWorkDir, "metastorage"), grpId, 1);
+
+            CacheStoreHolder old = idxCacheStores.put(grpId, holder);
+
+            assert old == null : "Non-null old store holder for metastorage";
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public void storeCacheData(StoredCacheData cacheData, boolean overwrite) throws IgniteCheckedException {
-        File cacheWorkDir = cacheWorkDirectory(cacheData.config());
+        File cacheWorkDir = cacheWorkDir(cacheData.config());
+
         File file;
 
         checkAndInitCacheWorkDir(cacheWorkDir);
@@ -321,18 +338,10 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
     }
 
     /**
-     * @param ccfg Cache configuration.
-     * @return Cache work directory.
+     *
      */
-    private File cacheWorkDirectory(CacheConfiguration ccfg) {
-        String dirName;
-
-        if (ccfg.getGroupName() != null)
-            dirName = CACHE_GRP_DIR_PREFIX + ccfg.getGroupName();
-        else
-            dirName = CACHE_DIR_PREFIX + ccfg.getName();
-
-        return new File(storeWorkDir, dirName);
+    public Path getPath(boolean isSharedGroup, String cacheOrGroupName, int partId) {
+        return getPartitionFile(cacheWorkDir(isSharedGroup, cacheOrGroupName), partId).toPath();
     }
 
     /**
@@ -344,25 +353,36 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
     private CacheStoreHolder initForCache(CacheGroupDescriptor grpDesc, CacheConfiguration ccfg) throws IgniteCheckedException {
         assert !grpDesc.sharedGroup() || ccfg.getGroupName() != null : ccfg.getName();
 
-        File cacheWorkDir = cacheWorkDirectory(ccfg);
+        File cacheWorkDir = cacheWorkDir(ccfg);
+
+        return initDir(cacheWorkDir, grpDesc.groupId(), grpDesc.config().getAffinity().partitions());
+    }
 
+    /**
+     * @param cacheWorkDir Work directory.
+     * @param grpId Group ID.
+     * @param partitions Number of partitions.
+     * @return Cache store holder.
+     * @throws IgniteCheckedException If failed.
+     */
+    private CacheStoreHolder initDir(File cacheWorkDir, int grpId, int partitions) throws IgniteCheckedException {
         boolean dirExisted = checkAndInitCacheWorkDir(cacheWorkDir);
 
         File idxFile = new File(cacheWorkDir, INDEX_FILE_NAME);
 
         if (dirExisted && !idxFile.exists())
-            grpsWithoutIdx.add(grpDesc.groupId());
+            grpsWithoutIdx.add(grpId);
 
         FileVersionCheckingFactory pageStoreFactory = new FileVersionCheckingFactory(
             dsCfg.getFileIOFactory(), igniteCfg.getDataStorageConfiguration());
 
         FilePageStore idxStore = pageStoreFactory.createPageStore(PageMemory.FLAG_IDX, idxFile);
 
-        FilePageStore[] partStores = new FilePageStore[grpDesc.config().getAffinity().partitions()];
+        FilePageStore[] partStores = new FilePageStore[partitions];
 
         for (int partId = 0; partId < partStores.length; partId++) {
             FilePageStore partStore = pageStoreFactory.createPageStore(
-                PageMemory.FLAG_DATA, new File(cacheWorkDir, String.format(PART_FILE_TEMPLATE, partId)));
+                PageMemory.FLAG_DATA, getPartitionFile(cacheWorkDir, partId));
 
             partStores[partId] = partStore;
         }
@@ -372,6 +392,14 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
 
     /**
      * @param cacheWorkDir Cache work directory.
+     * @param partId Partition id.
+     */
+    @NotNull private File getPartitionFile(File cacheWorkDir, int partId) {
+        return new File(cacheWorkDir, String.format(PART_FILE_TEMPLATE, partId));
+    }
+
+    /**
+     * @param cacheWorkDir Cache work directory.
      */
     private boolean checkAndInitCacheWorkDir(File cacheWorkDir) throws IgniteCheckedException {
         boolean dirExisted = false;
@@ -558,8 +586,21 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
      * @return Store dir for given cache.
      */
     public File cacheWorkDir(CacheConfiguration ccfg) {
-        String dirName = ccfg.getGroupName() == null ?
-            CACHE_DIR_PREFIX + ccfg.getName() : CACHE_GRP_DIR_PREFIX + ccfg.getGroupName();
+        boolean isSharedGrp = ccfg.getGroupName() != null;
+
+        return cacheWorkDir(isSharedGrp, isSharedGrp ? ccfg.getGroupName() : ccfg.getName());
+    }
+
+    /**
+     *
+     */
+    public File cacheWorkDir(boolean isSharedGroup, String cacheOrGroupName) {
+        String dirName;
+
+        if (isSharedGroup)
+            dirName = CACHE_GRP_DIR_PREFIX + cacheOrGroupName;
+        else
+            dirName = CACHE_DIR_PREFIX + cacheOrGroupName;
 
         return new File(storeWorkDir, dirName);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileUploader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileUploader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileUploader.java
new file mode 100644
index 0000000..ba21ae9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileUploader.java
@@ -0,0 +1,100 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.ignite.internal.processors.cache.persistence.file;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+
+/**
+ * Part of direct node to node file downloading
+ */
+public class FileUploader {
+    /** */
+    private static final int CHUNK_SIZE = 1024 * 1024;
+
+    /** */
+    private final Path path;
+
+    /**
+     *
+     */
+    public FileUploader(Path path) {
+        this.path = path;
+    }
+
+    /**
+     *
+     */
+    public void upload(SocketChannel writeChannel, GridFutureAdapter<Long> finishFut) {
+        FileChannel readChannel = null;
+
+        try {
+            File file = new File(path.toUri().getPath());
+
+            if (!file.exists()) {
+                finishFut.onDone(
+                    new IgniteCheckedException(
+                        new FileNotFoundException(file.getAbsolutePath())
+                    )
+                );
+
+                return;
+            }
+
+            readChannel = FileChannel.open(path, StandardOpenOption.READ);
+
+            long written = 0;
+
+            long size = readChannel.size();
+
+            while (written < size)
+                written += readChannel.transferTo(written, CHUNK_SIZE, writeChannel);
+
+            finishFut.onDone(written);
+        }
+        catch (IOException ex) {
+            finishFut.onDone(ex);
+        }
+        finally {
+            //FIXME: when an error occurs on writeChannel.close() no attempt to close readChannel will happen. Need to be fixed.
+            try {
+                if (writeChannel != null)
+                    writeChannel.close();
+            }
+            catch (IOException ex) {
+                throw new IgniteException("Could not close socket.");
+            }
+
+            try {
+                if (readChannel != null)
+                    readChannel.close();
+            }
+            catch (IOException ex) {
+                throw new IgniteException("Could not close file: " + path);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIO.java
new file mode 100644
index 0000000..83ff91b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIO.java
@@ -0,0 +1,141 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.internal.processors.cache.persistence.file;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+/**
+ * {@link FileIO} that allows to work with ZIP compressed file.
+ * Doesn't allow random access and setting {@link FileIO#position()} backwards.
+ * Allows sequential reads including setting {@link FileIO#position()} forward.
+ */
+public class UnzipFileIO implements FileIO {
+    /** Zip input stream. */
+    private final ZipInputStream zis;
+
+    /** Byte array for draining data. */
+    private final byte[] arr = new byte[128 * 1024];
+
+    /** Size of uncompressed data. */
+    private final long size;
+
+    /** Total bytes read counter. */
+    private long totalBytesRead = 0;
+
+    /**
+     * @param zip Compressed file.
+     */
+    public UnzipFileIO(File zip) throws IOException {
+        zis = new ZipInputStream(new BufferedInputStream(new FileInputStream(zip)));
+
+        ZipEntry entry = zis.getNextEntry();
+        size = entry.getSize();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long position() throws IOException {
+        return totalBytesRead;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void position(long newPosition) throws IOException {
+        if (newPosition == totalBytesRead)
+            return;
+
+        if (newPosition < totalBytesRead)
+            throw new UnsupportedOperationException("Seeking backwards is not supported.");
+
+        long bytesRemaining = newPosition - totalBytesRead;
+
+        while (bytesRemaining > 0) {
+            int bytesToRead = bytesRemaining > arr.length ? arr.length : (int)bytesRemaining;
+
+            bytesRemaining -= zis.read(arr, 0, bytesToRead);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public int read(ByteBuffer dstBuf) throws IOException {
+        int bytesRead = zis.read(arr, 0, Math.min(dstBuf.remaining(), arr.length));
+
+        if (bytesRead == -1)
+            return -1;
+
+        dstBuf.put(arr, 0, bytesRead);
+
+        totalBytesRead += bytesRead;
+
+        return bytesRead;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int read(ByteBuffer dstBuf, long position) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int read(byte[] buf, int off, int len) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int write(ByteBuffer srcBuf) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int write(ByteBuffer srcBuf, long position) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(byte[] buf, int off, int len) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void force() throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long size() throws IOException {
+        return size;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void clear() throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws IOException {
+        zis.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/filename/PdsConsistentIdProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/filename/PdsConsistentIdProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/filename/PdsConsistentIdProcessor.java
index e7a7e63..ba6d822 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/filename/PdsConsistentIdProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/filename/PdsConsistentIdProcessor.java
@@ -161,6 +161,9 @@ public class PdsConsistentIdProcessor extends GridProcessorAdapter implements Pd
         if (!CU.isPersistenceEnabled(cfg))
             return compatibleResolve(pstStoreBasePath, consistentId);
 
+        if (ctx.clientNode())
+            return new PdsFolderSettings(pstStoreBasePath, UUID.randomUUID());
+
         if (getBoolean(IGNITE_DATA_STORAGE_FOLDER_BY_CONSISTENT_ID, false))
             return compatibleResolve(pstStoreBasePath, consistentId);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java
new file mode 100644
index 0000000..0847ca6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java
@@ -0,0 +1,605 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.freelist;
+
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.pagemem.PageIdAllocator;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.pagemem.PageUtils;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertFragmentRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageRemoveRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageUpdateRecord;
+import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl;
+import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
+import org.apache.ignite.internal.processors.cache.persistence.Storable;
+import org.apache.ignite.internal.processors.cache.persistence.evict.PageEvictionTracker;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.AbstractDataPageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPagePayload;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseBag;
+import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
+import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ */
+public abstract class AbstractFreeList<T extends Storable> extends PagesList implements FreeList<T>, ReuseList {
+    /** */
+    private static final int BUCKETS = 256; // Must be power of 2.
+
+    /** */
+    private static final int REUSE_BUCKET = BUCKETS - 1;
+
+    /** */
+    private static final Integer COMPLETE = Integer.MAX_VALUE;
+
+    /** */
+    private static final Integer FAIL_I = Integer.MIN_VALUE;
+
+    /** */
+    private static final Long FAIL_L = Long.MAX_VALUE;
+
+    /** */
+    private static final int MIN_PAGE_FREE_SPACE = 8;
+
+    /** */
+    private final int shift;
+
+    /** */
+    private final AtomicReferenceArray<Stripe[]> buckets = new AtomicReferenceArray<>(BUCKETS);
+
+    /** */
+    private final int MIN_SIZE_FOR_DATA_PAGE;
+
+    /** */
+    private final int emptyDataPagesBucket;
+
+    /** */
+    private final PageHandler<T, Boolean> updateRow = new UpdateRowHandler();
+
+    /** */
+    private final DataRegionMetricsImpl memMetrics;
+
+    /** */
+    private final PageEvictionTracker evictionTracker;
+
+    /**
+     *
+     */
+    private final class UpdateRowHandler extends PageHandler<T, Boolean> {
+        @Override public Boolean run(
+            int cacheId,
+            long pageId,
+            long page,
+            long pageAddr,
+            PageIO iox,
+            Boolean walPlc,
+            T row,
+            int itemId)
+            throws IgniteCheckedException {
+            AbstractDataPageIO<T> io = (AbstractDataPageIO<T>)iox;
+
+            int rowSize = io.getRowSize(row);
+
+            boolean updated = io.updateRow(pageAddr, itemId, pageSize(), null, row, rowSize);
+
+            evictionTracker.touchPage(pageId);
+
+            if (updated && needWalDeltaRecord(pageId, page, walPlc)) {
+                // TODO This record must contain only a reference to a logical WAL record with the actual data.
+                byte[] payload = new byte[rowSize];
+
+                DataPagePayload data = io.readPayload(pageAddr, itemId, pageSize());
+
+                assert data.payloadSize() == rowSize;
+
+                PageUtils.getBytes(pageAddr, data.offset(), payload, 0, rowSize);
+
+                wal.log(new DataPageUpdateRecord(
+                    cacheId,
+                    pageId,
+                    itemId,
+                    payload));
+            }
+
+            return updated;
+        }
+    }
+
+    /** */
+    private final PageHandler<T, Integer> writeRow = new WriteRowHandler();
+
+    /**
+     *
+     */
+    private final class WriteRowHandler extends PageHandler<T, Integer> {
+        @Override public Integer run(
+            int cacheId,
+            long pageId,
+            long page,
+            long pageAddr,
+            PageIO iox,
+            Boolean walPlc,
+            T row,
+            int written)
+            throws IgniteCheckedException {
+            AbstractDataPageIO<T> io = (AbstractDataPageIO<T>)iox;
+
+            int rowSize = io.getRowSize(row);
+            int oldFreeSpace = io.getFreeSpace(pageAddr);
+
+            assert oldFreeSpace > 0 : oldFreeSpace;
+
+            // If the full row does not fit into this page write only a fragment.
+            written = (written == 0 && oldFreeSpace >= rowSize) ? addRow(pageId, page, pageAddr, io, row, rowSize) :
+                addRowFragment(pageId, page, pageAddr, io, row, written, rowSize);
+
+            // Reread free space after update.
+            int newFreeSpace = io.getFreeSpace(pageAddr);
+
+            if (newFreeSpace > MIN_PAGE_FREE_SPACE) {
+                int bucket = bucket(newFreeSpace, false);
+
+                put(null, pageId, page, pageAddr, bucket);
+            }
+
+            if (written == rowSize)
+                evictionTracker.touchPage(pageId);
+
+            // Avoid boxing with garbage generation for usual case.
+            return written == rowSize ? COMPLETE : written;
+        }
+
+        /**
+         * @param pageId Page ID.
+         * @param page Page pointer.
+         * @param pageAddr Page address.
+         * @param io IO.
+         * @param row Row.
+         * @param rowSize Row size.
+         * @return Written size which is always equal to row size here.
+         * @throws IgniteCheckedException If failed.
+         */
+        private int addRow(
+            long pageId,
+            long page,
+            long pageAddr,
+            AbstractDataPageIO<T> io,
+            T row,
+            int rowSize
+        ) throws IgniteCheckedException {
+            io.addRow(pageId, pageAddr, row, rowSize, pageSize());
+
+            if (needWalDeltaRecord(pageId, page, null)) {
+                // TODO IGNITE-5829 This record must contain only a reference to a logical WAL record with the actual data.
+                byte[] payload = new byte[rowSize];
+
+                DataPagePayload data = io.readPayload(pageAddr, PageIdUtils.itemId(row.link()), pageSize());
+
+                assert data.payloadSize() == rowSize;
+
+                PageUtils.getBytes(pageAddr, data.offset(), payload, 0, rowSize);
+
+                wal.log(new DataPageInsertRecord(
+                    grpId,
+                    pageId,
+                    payload));
+            }
+
+            return rowSize;
+        }
+
+        /**
+         * @param pageId Page ID.
+         * @param page Page pointer.
+         * @param pageAddr Page address.
+         * @param io IO.
+         * @param row Row.
+         * @param written Written size.
+         * @param rowSize Row size.
+         * @return Updated written size.
+         * @throws IgniteCheckedException If failed.
+         */
+        private int addRowFragment(
+            long pageId,
+            long page,
+            long pageAddr,
+            AbstractDataPageIO<T> io,
+            T row,
+            int written,
+            int rowSize
+        ) throws IgniteCheckedException {
+            // Read last link before the fragment write, because it will be updated there.
+            long lastLink = row.link();
+
+            int payloadSize = io.addRowFragment(pageMem, pageAddr, row, written, rowSize, pageSize());
+
+            assert payloadSize > 0 : payloadSize;
+
+            if (needWalDeltaRecord(pageId, page, null)) {
+                // TODO IGNITE-5829 This record must contain only a reference to a logical WAL record with the actual data.
+                byte[] payload = new byte[payloadSize];
+
+                DataPagePayload data = io.readPayload(pageAddr, PageIdUtils.itemId(row.link()), pageSize());
+
+                PageUtils.getBytes(pageAddr, data.offset(), payload, 0, payloadSize);
+
+                wal.log(new DataPageInsertFragmentRecord(grpId, pageId, payload, lastLink));
+            }
+
+            return written + payloadSize;
+        }
+    }
+
+    /** */
+    private final PageHandler<Void, Long> rmvRow;
+
+    /**
+     *
+     */
+    private final class RemoveRowHandler extends PageHandler<Void, Long> {
+        /** Indicates whether partition ID should be masked from page ID. */
+        private final boolean maskPartId;
+
+        /** */
+        RemoveRowHandler(boolean maskPartId) {
+            this.maskPartId = maskPartId;
+        }
+
+        @Override public Long run(
+            int cacheId,
+            long pageId,
+            long page,
+            long pageAddr,
+            PageIO iox,
+            Boolean walPlc,
+            Void ignored,
+            int itemId)
+            throws IgniteCheckedException {
+            AbstractDataPageIO<T> io = (AbstractDataPageIO<T>)iox;
+
+            int oldFreeSpace = io.getFreeSpace(pageAddr);
+
+            assert oldFreeSpace >= 0 : oldFreeSpace;
+
+            long nextLink = io.removeRow(pageAddr, itemId, pageSize());
+
+            if (needWalDeltaRecord(pageId, page, walPlc))
+                wal.log(new DataPageRemoveRecord(cacheId, pageId, itemId));
+
+            int newFreeSpace = io.getFreeSpace(pageAddr);
+
+            if (newFreeSpace > MIN_PAGE_FREE_SPACE) {
+                int newBucket = bucket(newFreeSpace, false);
+
+                if (oldFreeSpace > MIN_PAGE_FREE_SPACE) {
+                    int oldBucket = bucket(oldFreeSpace, false);
+
+                    if (oldBucket != newBucket) {
+                        // It is possible that page was concurrently taken for put, in this case put will handle bucket change.
+                        pageId = maskPartId ? PageIdUtils.maskPartitionId(pageId) : pageId;
+                        if (removeDataPage(pageId, page, pageAddr, io, oldBucket))
+                            put(null, pageId, page, pageAddr, newBucket);
+                    }
+                }
+                else
+                    put(null, pageId, page, pageAddr, newBucket);
+
+                if (io.isEmpty(pageAddr))
+                    evictionTracker.forgetPage(pageId);
+            }
+
+            // For common case boxed 0L will be cached inside of Long, so no garbage will be produced.
+            return nextLink;
+        }
+    }
+
+    /**
+     * @param cacheId Cache ID.
+     * @param name Name (for debug purpose).
+     * @param memMetrics Memory metrics.
+     * @param memPlc Data region.
+     * @param reuseList Reuse list or {@code null} if this free list will be a reuse list for itself.
+     * @param wal Write ahead log manager.
+     * @param metaPageId Metadata page ID.
+     * @param initNew {@code True} if new metadata should be initialized.
+     * @throws IgniteCheckedException If failed.
+     */
+    public AbstractFreeList(
+        int cacheId,
+        String name,
+        DataRegionMetricsImpl memMetrics,
+        DataRegion memPlc,
+        ReuseList reuseList,
+        IgniteWriteAheadLogManager wal,
+        long metaPageId,
+        boolean initNew) throws IgniteCheckedException {
+        super(cacheId, name, memPlc.pageMemory(), BUCKETS, wal, metaPageId);
+
+        rmvRow = new RemoveRowHandler(cacheId == 0);
+
+        this.evictionTracker = memPlc.evictionTracker();
+        this.reuseList = reuseList == null ? this : reuseList;
+        int pageSize = pageMem.pageSize();
+
+        assert U.isPow2(pageSize) : "Page size must be a power of 2: " + pageSize;
+        assert U.isPow2(BUCKETS);
+        assert BUCKETS <= pageSize : pageSize;
+
+        // TODO this constant is used because currently we cannot reuse data pages as index pages
+        // TODO and vice-versa. It should be removed when data storage format is finalized.
+        MIN_SIZE_FOR_DATA_PAGE = pageSize - AbstractDataPageIO.MIN_DATA_PAGE_OVERHEAD;
+
+        int shift = 0;
+
+        while (pageSize > BUCKETS) {
+            shift++;
+            pageSize >>>= 1;
+        }
+
+        this.shift = shift;
+
+        this.memMetrics = memMetrics;
+
+        emptyDataPagesBucket = bucket(MIN_SIZE_FOR_DATA_PAGE, false);
+
+        init(metaPageId, initNew);
+    }
+
+    /**
+     * Calculates average fill factor over FreeListImpl instance.
+     *
+     * @return Tuple (numenator, denominator).
+     */
+    public T2<Long, Long> fillFactor() {
+        long pageSize = pageSize();
+
+        long totalSize = 0;
+        long loadSize = 0;
+
+        for (int b = BUCKETS - 2; b > 0; b--) {
+            long bsize = pageSize - ((REUSE_BUCKET - b) << shift);
+
+            long pages = bucketsSize[b].longValue();
+
+            loadSize += pages * (pageSize - bsize);
+
+            totalSize += pages * pageSize;
+        }
+
+        return totalSize == 0 ? new T2<>(0L, 0L) : new T2<>(loadSize, totalSize);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void dumpStatistics(IgniteLogger log) {
+        long dataPages = 0;
+
+        final boolean dumpBucketsInfo = false;
+
+        for (int b = 0; b < BUCKETS; b++) {
+            long size = bucketsSize[b].longValue();
+
+            if (!isReuseBucket(b))
+                dataPages += size;
+
+            if (dumpBucketsInfo) {
+                Stripe[] stripes = getBucket(b);
+
+                boolean empty = true;
+
+                if (stripes != null) {
+                    for (Stripe stripe : stripes) {
+                        if (!stripe.empty) {
+                            empty = false;
+
+                            break;
+                        }
+                    }
+                }
+
+                if (log.isInfoEnabled())
+                    log.info("Bucket [b=" + b +
+                        ", size=" + size +
+                        ", stripes=" + (stripes != null ? stripes.length : 0) +
+                        ", stripesEmpty=" + empty + ']');
+            }
+        }
+
+        if (dataPages > 0) {
+            if (log.isInfoEnabled())
+                log.info("FreeList [name=" + name +
+                    ", buckets=" + BUCKETS +
+                    ", dataPages=" + dataPages +
+                    ", reusePages=" + bucketsSize[REUSE_BUCKET].longValue() + "]");
+        }
+    }
+
+    /**
+     * @param freeSpace Page free space.
+     * @param allowReuse {@code True} if it is allowed to get reuse bucket.
+     * @return Bucket.
+     */
+    private int bucket(int freeSpace, boolean allowReuse) {
+        assert freeSpace > 0 : freeSpace;
+
+        int bucket = freeSpace >>> shift;
+
+        assert bucket >= 0 && bucket < BUCKETS : bucket;
+
+        if (!allowReuse && isReuseBucket(bucket))
+            bucket--;
+
+        return bucket;
+    }
+
+    /**
+     * @param part Partition.
+     * @return Page ID.
+     * @throws IgniteCheckedException If failed.
+     */
+    private long allocateDataPage(int part) throws IgniteCheckedException {
+        assert part <= PageIdAllocator.MAX_PARTITION_ID;
+        assert part != PageIdAllocator.INDEX_PARTITION;
+
+        return pageMem.allocatePage(grpId, part, PageIdAllocator.FLAG_DATA);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void insertDataRow(T row) throws IgniteCheckedException {
+        int rowSize = ioVersions().latest().getRowSize(row);
+
+        int written = 0;
+
+        do {
+            if (written != 0)
+                memMetrics.incrementLargeEntriesPages();
+
+            int freeSpace = Math.min(MIN_SIZE_FOR_DATA_PAGE, rowSize - written);
+
+            long pageId = 0L;
+
+            if (freeSpace == MIN_SIZE_FOR_DATA_PAGE)
+                pageId = takeEmptyPage(emptyDataPagesBucket, ioVersions());
+
+            boolean reuseBucket = false;
+
+            // TODO: properly handle reuse bucket.
+            if (pageId == 0L) {
+                for (int b = bucket(freeSpace, false) + 1; b < BUCKETS - 1; b++) {
+                    pageId = takeEmptyPage(b, ioVersions());
+
+                    if (pageId != 0L) {
+                        reuseBucket = isReuseBucket(b);
+
+                        break;
+                    }
+                }
+            }
+
+            boolean allocated = pageId == 0L;
+
+            if (allocated)
+                pageId = allocateDataPage(row.partition());
+            else
+                pageId = PageIdUtils.changePartitionId(pageId, (row.partition()));
+
+            AbstractDataPageIO<T> init = reuseBucket || allocated ? ioVersions().latest() : null;
+
+            written = write(pageId, writeRow, init, row, written, FAIL_I);
+
+            assert written != FAIL_I; // We can't fail here.
+        }
+        while (written != COMPLETE);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean updateDataRow(long link, T row) throws IgniteCheckedException {
+        assert link != 0;
+
+        long pageId = PageIdUtils.pageId(link);
+        int itemId = PageIdUtils.itemId(link);
+
+        Boolean updated = write(pageId, updateRow, row, itemId, null);
+
+        assert updated != null; // Can't fail here.
+
+        return updated;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void removeDataRowByLink(long link) throws IgniteCheckedException {
+        assert link != 0;
+
+        long pageId = PageIdUtils.pageId(link);
+        int itemId = PageIdUtils.itemId(link);
+
+        long nextLink = write(pageId, rmvRow, itemId, FAIL_L);
+
+        assert nextLink != FAIL_L; // Can't fail here.
+
+        while (nextLink != 0L) {
+            memMetrics.decrementLargeEntriesPages();
+
+            itemId = PageIdUtils.itemId(nextLink);
+            pageId = PageIdUtils.pageId(nextLink);
+
+            nextLink = write(pageId, rmvRow, itemId, FAIL_L);
+
+            assert nextLink != FAIL_L; // Can't fail here.
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Stripe[] getBucket(int bucket) {
+        return buckets.get(bucket);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean casBucket(int bucket, Stripe[] exp, Stripe[] upd) {
+        return buckets.compareAndSet(bucket, exp, upd);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean isReuseBucket(int bucket) {
+        return bucket == REUSE_BUCKET;
+    }
+
+    /**
+     * @return Number of empty data pages in free list.
+     */
+    public int emptyDataPages() {
+        return bucketsSize[emptyDataPagesBucket].intValue();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void addForRecycle(ReuseBag bag) throws IgniteCheckedException {
+        assert reuseList == this : "not allowed to be a reuse list";
+
+        put(bag, 0, 0, 0L, REUSE_BUCKET);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long takeRecycledPage() throws IgniteCheckedException {
+        assert reuseList == this : "not allowed to be a reuse list";
+
+        return takeEmptyPage(REUSE_BUCKET, null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long recycledPagesCount() throws IgniteCheckedException {
+        assert reuseList == this : "not allowed to be a reuse list";
+
+        return storedPagesCount(REUSE_BUCKET);
+    }
+
+    /**
+     * @return IOVersions.
+     */
+    public abstract IOVersions<? extends AbstractDataPageIO<T>> ioVersions();
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "FreeList [name=" + name + ']';
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/CacheFreeListImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/CacheFreeListImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/CacheFreeListImpl.java
new file mode 100644
index 0000000..dc0c92e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/CacheFreeListImpl.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.freelist;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
+import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.AbstractDataPageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
+import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
+
+/**
+ * FreeList implementation for cache.
+ */
+public class CacheFreeListImpl extends AbstractFreeList<CacheDataRow> {
+    /**
+     * @param cacheId Cache id.
+     * @param name Name.
+     * @param regionMetrics Region metrics.
+     * @param dataRegion Data region.
+     * @param reuseList Reuse list.
+     * @param wal Wal.
+     * @param metaPageId Meta page id.
+     * @param initNew Initialize new.
+     */
+    public CacheFreeListImpl(int cacheId, String name, DataRegionMetricsImpl regionMetrics, DataRegion dataRegion,
+        ReuseList reuseList,
+        IgniteWriteAheadLogManager wal, long metaPageId, boolean initNew) throws IgniteCheckedException {
+        super(cacheId, name, regionMetrics, dataRegion, reuseList, wal, metaPageId, initNew);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IOVersions<? extends AbstractDataPageIO<CacheDataRow>> ioVersions() {
+        return DataPageIO.VERSIONS;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "FreeList [name=" + name + ']';
+    }
+}


Mime
View raw message