ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [15/47] ignite git commit: IGNITE-5267 - Moved ignite-ps module to ignite-core
Date Sun, 11 Jun 2017 20:03:41 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/6bf5ce46/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheOffheapManager.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheOffheapManager.java
deleted file mode 100644
index c0176f6..0000000
--- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheOffheapManager.java
+++ /dev/null
@@ -1,1316 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.database;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.internal.pagemem.FullPageId;
-import org.apache.ignite.internal.pagemem.PageIdAllocator;
-import org.apache.ignite.internal.pagemem.PageIdUtils;
-import org.apache.ignite.internal.pagemem.PageMemory;
-import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
-import org.apache.ignite.internal.pagemem.wal.WALIterator;
-import org.apache.ignite.internal.pagemem.wal.WALPointer;
-import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
-import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
-import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageInitRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateNextSnapshotId;
-import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdatePartitionDataRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionDestroyRecord;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheGroupContext;
-import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl;
-import org.apache.ignite.internal.processors.cache.IgniteRebalanceIterator;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.database.freelist.FreeListImpl;
-import org.apache.ignite.internal.processors.cache.database.pagemem.PageMemoryEx;
-import org.apache.ignite.internal.processors.cache.database.tree.io.PageIO;
-import org.apache.ignite.internal.processors.cache.database.tree.io.PageMetaIO;
-import org.apache.ignite.internal.processors.cache.database.tree.io.PagePartitionCountersIO;
-import org.apache.ignite.internal.processors.cache.database.tree.io.PagePartitionMetaIO;
-import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList;
-import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseListImpl;
-import org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.GridUnsafe;
-import org.apache.ignite.internal.util.lang.GridCursor;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.T2;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiTuple;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Used when persistence enabled.
- */
-public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl implements DbCheckpointListener {
-    /** */
-    private MetaStore metaStore;
-
-    /** */
-    private ReuseListImpl reuseList;
-
-    /** {@inheritDoc} */
-    @Override protected void initDataStructures() throws IgniteCheckedException {
-        Metas metas = getOrAllocateCacheMetas();
-
-        RootPage reuseListRoot = metas.reuseListRoot;
-
-        reuseList = new ReuseListImpl(grp.groupId(),
-            grp.cacheOrGroupName(),
-            grp.memoryPolicy().pageMemory(),
-            ctx.wal(),
-            reuseListRoot.pageId().pageId(),
-            reuseListRoot.isAllocated());
-
-        RootPage metastoreRoot = metas.treeRoot;
-
-        metaStore = new MetadataStorage(grp.memoryPolicy().pageMemory(),
-            ctx.wal(),
-            globalRemoveId(),
-            grp.groupId(),
-            PageIdAllocator.INDEX_PARTITION,
-            PageIdAllocator.FLAG_IDX,
-            reuseList,
-            metastoreRoot.pageId().pageId(),
-            metastoreRoot.isAllocated());
-
-        ((GridCacheDatabaseSharedManager)ctx.database()).addCheckpointListener(this);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onCacheStarted(GridCacheContext cctx) throws IgniteCheckedException {
-        if (cctx.affinityNode() && cctx.ttl().eagerTtlEnabled() && pendingEntries == null) {
-            ctx.database().checkpointReadLock();
-
-            try {
-                final String name = "PendingEntries";
-
-                RootPage pendingRootPage = metaStore.getOrAllocateForTree(name);
-
-                pendingEntries = new PendingEntriesTree(
-                    grp,
-                    name,
-                    grp.memoryPolicy().pageMemory(),
-                    pendingRootPage.pageId().pageId(),
-                    reuseList,
-                    pendingRootPage.isAllocated()
-                );
-            }
-            finally {
-                ctx.database().checkpointReadUnlock();
-            }
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override protected CacheDataStore createCacheDataStore0(final int p)
-        throws IgniteCheckedException {
-        GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)ctx.database();
-
-        boolean exists = ctx.pageStore() != null
-            && ctx.pageStore().exists(grp.groupId(), p);
-
-        return new GridCacheDataStore(p, exists);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onCheckpointBegin(Context ctx) throws IgniteCheckedException {
-        assert grp.memoryPolicy().pageMemory() instanceof PageMemoryEx;
-
-        reuseList.saveMetadata();
-
-        boolean metaWasUpdated = false;
-
-        for (CacheDataStore store : partDataStores.values()) {
-            RowStore rowStore = store.rowStore();
-
-            if (rowStore == null)
-                continue;
-
-            metaWasUpdated |= saveStoreMetadata(store, ctx, !metaWasUpdated, false);
-        }
-    }
-
-    /**
-     * @param store Store to save metadata.
-     * @throws IgniteCheckedException If failed.
-     */
-    private boolean saveStoreMetadata(CacheDataStore store, Context ctx, boolean saveMeta,
-        boolean beforeDestroy) throws IgniteCheckedException {
-        RowStore rowStore0 = store.rowStore();
-
-        boolean beforeSnapshot = ctx != null && ctx.nextSnapshot();
-
-        boolean wasSaveToMeta = false;
-
-        if (rowStore0 != null) {
-            FreeListImpl freeList = (FreeListImpl)rowStore0.freeList();
-
-            freeList.saveMetadata();
-
-            long updCntr = store.updateCounter();
-            int size = store.fullSize();
-            long rmvId = globalRemoveId().get();
-
-            PageMemoryEx pageMem = (PageMemoryEx)grp.memoryPolicy().pageMemory();
-            IgniteWriteAheadLogManager wal = this.ctx.wal();
-
-            if (size > 0 || updCntr > 0) {
-                int state = -1;
-
-                if (beforeDestroy)
-                    state = GridDhtPartitionState.EVICTED.ordinal();
-                else {
-                    // localPartition will not acquire writeLock here because create=false.
-                    GridDhtLocalPartition part = grp.topology().localPartition(store.partId(),
-                        AffinityTopologyVersion.NONE, false);
-
-                    if (part != null && part.state() != GridDhtPartitionState.EVICTED)
-                        state = part.state().ordinal();
-                }
-
-                // Do not save meta for evicted partitions on next checkpoints.
-                if (state == -1)
-                    return false;
-
-                int grpId = grp.groupId();
-                long partMetaId = pageMem.partitionMetaPageId(grpId, store.partId());
-                long partMetaPage = pageMem.acquirePage(grpId, partMetaId);
-
-                try {
-                    long pageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage);
-
-                    if (pageAddr == 0L) {
-                        U.warn(log, "Failed to acquire write lock for meta page [metaPage=" + partMetaPage +
-                            ", saveMeta=" + saveMeta + ", beforeDestroy=" + beforeDestroy + ", size=" + size +
-                            ", updCntr=" + updCntr + ", state=" + state + ']');
-
-                        return false;
-                    }
-
-                    boolean changed = false;
-
-                    try {
-                        PagePartitionMetaIO io = PageIO.getPageIO(pageAddr);
-
-                        changed |= io.setUpdateCounter(pageAddr, updCntr);
-                        changed |= io.setGlobalRemoveId(pageAddr, rmvId);
-                        changed |= io.setSize(pageAddr, size);
-
-                        changed |= io.setPartitionState(pageAddr, (byte)state);
-
-                        long cntrsPageId;
-
-                        if (grp.sharedGroup()) {
-                            cntrsPageId = io.getCountersPageId(pageAddr);
-
-                            byte[] data = serializeCacheSizes(store.cacheSizes());
-
-                            int items = data.length / 12;
-                            int written = 0;
-                            int pageSize = pageMem.pageSize();
-
-                            boolean init = cntrsPageId == 0;
-
-                            if (init) {
-                                cntrsPageId = pageMem.allocatePage(grpId, store.partId(), PageIdAllocator.FLAG_DATA);
-
-                                io.setCountersPageId(pageAddr, cntrsPageId);
-
-                                changed = true;
-                            }
-
-                            long nextId = cntrsPageId;
-
-                            while (written != items) {
-                                final long curId = nextId;
-                                final long curPage = pageMem.acquirePage(grpId, curId);
-
-                                try {
-                                    final long curAddr = pageMem.writeLock(grpId, curId, curPage);
-
-                                    assert curAddr != 0;
-
-                                    try {
-                                        PagePartitionCountersIO partMetaIo;
-
-                                        if (init) {
-                                            partMetaIo = PagePartitionCountersIO.VERSIONS.latest();
-
-                                            partMetaIo.initNewPage(curAddr, curId, pageSize);
-                                        }
-                                        else
-                                            partMetaIo = PageIO.getPageIO(curAddr);
-
-                                        written += partMetaIo.writeCacheSizes(pageSize, curAddr, data, written);
-
-                                        nextId = partMetaIo.getNextCountersPageId(curAddr);
-
-                                        if(written != items && (init = nextId == 0)) {
-                                            //allocate new counters page
-                                            nextId = pageMem.allocatePage(grpId, store.partId(), PageIdAllocator.FLAG_DATA);
-                                            partMetaIo.setNextCountersPageId(curAddr, nextId);
-                                        }
-                                    }
-                                    finally {
-                                        // Write full page
-                                        pageMem.writeUnlock(grpId, curId, curPage, Boolean.TRUE, true);
-                                    }
-                                }
-                                finally {
-                                    pageMem.releasePage(grpId, curId, curPage);
-                                }
-                            }
-                        }
-                        else
-                            cntrsPageId = 0L;
-
-                        int pageCnt;
-
-                        if (beforeSnapshot) {
-                            pageCnt = this.ctx.pageStore().pages(grpId, store.partId());
-                            io.setCandidatePageCount(pageAddr, pageCnt);
-
-                            if (saveMeta) {
-                                long metaPageId = pageMem.metaPageId(grpId);
-                                long metaPage = pageMem.acquirePage(grpId, metaPageId);
-
-                                try {
-                                    long metaPageAddr = pageMem.writeLock(grpId, metaPageId, metaPage);
-
-                                    try {
-                                        long nextSnapshotTag = io.getNextSnapshotTag(metaPageAddr);
-
-                                        io.setNextSnapshotTag(metaPageAddr, nextSnapshotTag + 1);
-
-                                        if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, metaPageId,
-                                            metaPage, wal, null))
-                                            wal.log(new MetaPageUpdateNextSnapshotId(grpId, metaPageId,
-                                                nextSnapshotTag + 1));
-
-                                        addPartition(ctx.partitionStatMap(), metaPageAddr, io, grpId, PageIdAllocator.INDEX_PARTITION,
-                                            this.ctx.kernalContext().cache().context().pageStore().pages(grpId, PageIdAllocator.INDEX_PARTITION));
-                                    }
-                                    finally {
-                                        pageMem.writeUnlock(grpId, metaPageId, metaPage, null, true);
-                                    }
-                                }
-                                finally {
-                                    pageMem.releasePage(grpId, metaPageId, metaPage);
-                                }
-
-                                wasSaveToMeta = true;
-                            }
-
-                            GridDhtPartitionMap partMap = grp.topology().localPartitionMap();
-
-                            if (partMap.containsKey(store.partId()) &&
-                                partMap.get(store.partId()) == GridDhtPartitionState.OWNING)
-                                addPartition(ctx.partitionStatMap(), pageAddr, io, grpId, store.partId(),
-                                    this.ctx.pageStore().pages(grpId, store.partId()));
-
-                            changed = true;
-                        }
-                        else
-                            pageCnt = io.getCandidatePageCount(pageAddr);
-
-                        if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, partMetaId, partMetaPage, wal, null))
-                            wal.log(new MetaPageUpdatePartitionDataRecord(
-                                grpId,
-                                partMetaId,
-                                updCntr,
-                                rmvId,
-                                size,
-                                cntrsPageId,
-                                (byte)state,
-                                pageCnt
-                            ));
-                    }
-                    finally {
-                        pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, changed);
-                    }
-                }
-                finally {
-                    pageMem.releasePage(grpId, partMetaId, partMetaPage);
-                }
-            }
-        }
-
-        return wasSaveToMeta;
-    }
-
-    /**
-     * @param cacheSizes Cache sizes.
-     * @return Serialized cache sizes
-     */
-    private byte[] serializeCacheSizes(Map<Integer, Long> cacheSizes) {
-        // Item size = 4 bytes (cache ID) + 8 bytes (cache size) = 12 bytes
-        byte[] data = new byte[cacheSizes.size() * 12];
-        long off = GridUnsafe.BYTE_ARR_OFF;
-
-        for (Map.Entry<Integer, Long> entry : cacheSizes.entrySet()) {
-            GridUnsafe.putInt(data, off, entry.getKey()); off += 4;
-            GridUnsafe.putLong(data, off, entry.getValue()); off += 8;
-        }
-
-        return data;
-    }
-
-    /**
-     * @param map Map to add values to.
-     * @param pageAddr page address
-     * @param io Page Meta IO
-     * @param cacheId Cache ID.
-     * @param partition Partition ID.
-     * @param pages Number of pages to add.
-     */
-    private static void addPartition(
-        Map<T2<Integer, Integer>, T2<Integer, Integer>> map,
-        long pageAddr,
-        PagePartitionMetaIO io,
-        int cacheId,
-        int partition,
-        int pages
-    ) {
-        if (pages <= 1)
-            return;
-
-        assert PageIO.getPageId(pageAddr) != 0;
-
-        int lastAllocatedIdx = io.getLastPageCount(pageAddr);
-        map.put(new T2<>(cacheId, partition), new T2<>(lastAllocatedIdx, pages));
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void destroyCacheDataStore0(CacheDataStore store) throws IgniteCheckedException {
-        ctx.database().checkpointReadLock();
-
-        try {
-            int p = store.partId();
-
-            saveStoreMetadata(store, null, false, true);
-
-            PageMemoryEx pageMemory = (PageMemoryEx)grp.memoryPolicy().pageMemory();
-
-            int tag = pageMemory.invalidate(grp.groupId(), p);
-
-            ctx.wal().log(new PartitionDestroyRecord(grp.groupId(), p));
-
-            ctx.pageStore().onPartitionDestroyed(grp.groupId(), p, tag);
-        }
-        finally {
-            ctx.database().checkpointReadUnlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onPartitionCounterUpdated(int part, long cntr) {
-        CacheDataStore store = partDataStores.get(part);
-
-        assert store != null;
-
-        long oldCnt = store.updateCounter();
-
-        if (oldCnt < cntr)
-            store.updateCounter(cntr);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onPartitionInitialCounterUpdated(int part, long cntr) {
-        CacheDataStore store = partDataStores.get(part);
-
-        assert store != null;
-
-        long oldCnt = store.initialUpdateCounter();
-
-        if (oldCnt < cntr)
-            store.updateInitialCounter(cntr);
-    }
-
-    /** {@inheritDoc} */
-    @Override public long lastUpdatedPartitionCounter(int part) {
-        return partDataStores.get(part).updateCounter();
-    }
-
-    /** {@inheritDoc} */
-    @Override public RootPage rootPageForIndex(int cacheId, String idxName) throws IgniteCheckedException {
-        if (grp.sharedGroup())
-            idxName = Integer.toString(cacheId) + "_" + idxName;
-
-        return metaStore.getOrAllocateForTree(idxName);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void dropRootPageForIndex(int cacheId, String idxName) throws IgniteCheckedException {
-        if (grp.sharedGroup())
-            idxName = Integer.toString(cacheId) + "_" + idxName;
-
-        metaStore.dropRootPage(idxName);
-    }
-
-    /** {@inheritDoc} */
-    @Override public ReuseList reuseListForIndex(String idxName) {
-        return reuseList;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void stop() {
-        if (grp.affinityNode())
-            ((GridCacheDatabaseSharedManager)ctx.database()).removeCheckpointListener(this);
-    }
-
-    /**
-     * @return Meta root pages info.
-     * @throws IgniteCheckedException If failed.
-     */
-    private Metas getOrAllocateCacheMetas() throws IgniteCheckedException {
-        PageMemoryEx pageMem = (PageMemoryEx)grp.memoryPolicy().pageMemory();
-        IgniteWriteAheadLogManager wal = ctx.wal();
-
-        int grpId = grp.groupId();
-        long metaId = pageMem.metaPageId(grpId);
-        long metaPage = pageMem.acquirePage(grpId, metaId);
-
-        try {
-            final long pageAddr = pageMem.writeLock(grpId, metaId, metaPage);
-
-            boolean allocated = false;
-
-            try {
-                long metastoreRoot, reuseListRoot;
-
-                if (PageIO.getType(pageAddr) != PageIO.T_META) {
-                    PageMetaIO pageIO = PageMetaIO.VERSIONS.latest();
-
-                    pageIO.initNewPage(pageAddr, metaId, pageMem.pageSize());
-
-                    metastoreRoot = pageMem.allocatePage(grpId, PageIdAllocator.INDEX_PARTITION, PageMemory.FLAG_IDX);
-                    reuseListRoot = pageMem.allocatePage(grpId, PageIdAllocator.INDEX_PARTITION, PageMemory.FLAG_IDX);
-
-                    pageIO.setTreeRoot(pageAddr, metastoreRoot);
-                    pageIO.setReuseListRoot(pageAddr, reuseListRoot);
-
-                    if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, metaId, metaPage, wal, null))
-                        wal.log(new MetaPageInitRecord(
-                            grpId,
-                            metaId,
-                            pageIO.getType(),
-                            pageIO.getVersion(),
-                            metastoreRoot,
-                            reuseListRoot
-                        ));
-
-                    allocated = true;
-                }
-                else {
-                    PageMetaIO pageIO = PageIO.getPageIO(pageAddr);
-
-                    metastoreRoot = pageIO.getTreeRoot(pageAddr);
-                    reuseListRoot = pageIO.getReuseListRoot(pageAddr);
-
-                    assert reuseListRoot != 0L;
-                }
-
-                return new Metas(
-                    new RootPage(new FullPageId(metastoreRoot, grpId), allocated),
-                    new RootPage(new FullPageId(reuseListRoot, grpId), allocated));
-            }
-            finally {
-                pageMem.writeUnlock(grpId, metaId, metaPage, null, allocated);
-            }
-        }
-        finally {
-            pageMem.releasePage(grpId, metaId, metaPage);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteRebalanceIterator rebalanceIterator(int part, AffinityTopologyVersion topVer,
-        Long partCntrSince) throws IgniteCheckedException {
-        if (partCntrSince == null)
-            return super.rebalanceIterator(part, topVer, partCntrSince);
-
-        GridCacheDatabaseSharedManager database = (GridCacheDatabaseSharedManager)ctx.database();
-
-        try {
-            WALPointer startPtr = database.searchPartitionCounter(grp.groupId(), part, partCntrSince);
-
-            if (startPtr == null) {
-                assert false : "partCntr=" + partCntrSince + ", reservations=" + S.toString(Map.class, database.reservedForPreloading());
-
-                return super.rebalanceIterator(part, topVer, partCntrSince);
-            }
-
-            WALIterator it = ctx.wal().replay(startPtr);
-
-            return new RebalanceIteratorAdapter(grp, it, part);
-        }
-        catch (IgniteCheckedException e) {
-            U.warn(log, "Failed to create WAL-based rebalance iterator (a full partition will transferred to a " +
-                "remote node) [part=" + part + ", partCntrSince=" + partCntrSince + ", err=" + e + ']');
-
-            return super.rebalanceIterator(part, topVer, partCntrSince);
-        }
-    }
-
-    /**
-     *
-     */
-    private static class RebalanceIteratorAdapter implements IgniteRebalanceIterator {
-        /** Serial version uid. */
-        private static final long serialVersionUID = 0L;
-
-        /** Cache group caches. */
-        private final Set<Integer> cacheGrpCaches;
-
-        /** WAL iterator. */
-        private final WALIterator walIt;
-
-        /** Partition to scan. */
-        private final int part;
-
-        /** */
-        private Iterator<DataEntry> entryIt;
-
-        /** */
-        private CacheDataRow next;
-
-        /**
-         * @param grp Cache group.
-         * @param walIt WAL iterator.
-         * @param part Partition ID.
-         */
-        private RebalanceIteratorAdapter(CacheGroupContext grp, WALIterator walIt, int part) {
-            this.cacheGrpCaches = grp.cacheIds();
-            this.walIt = walIt;
-            this.part = part;
-
-            advance();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean historical() {
-            return true;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void close() throws IgniteCheckedException {
-            walIt.close();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean isClosed() {
-            return walIt.isClosed();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean hasNextX() {
-            return hasNext();
-        }
-
-        /** {@inheritDoc} */
-        @Override public CacheDataRow nextX() throws IgniteCheckedException {
-            return next();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void removeX() throws IgniteCheckedException {
-            throw new UnsupportedOperationException();
-        }
-
-        /** {@inheritDoc} */
-        @Override public Iterator<CacheDataRow> iterator() {
-            return this;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean hasNext() {
-            return next != null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public CacheDataRow next() {
-            if (next == null)
-                throw new NoSuchElementException();
-
-            CacheDataRow val = next;
-
-            advance();
-
-            return val;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void remove() {
-            throw new UnsupportedOperationException();
-        }
-
-        /**
-         *
-         */
-        private void advance() {
-            next = null;
-
-            while (true) {
-                if (entryIt != null) {
-                    while (entryIt.hasNext()) {
-                        DataEntry entry = entryIt.next();
-
-                        if (entry.partitionId() == part && cacheGrpCaches.contains(entry.cacheId())) {
-
-                            next = new DataEntryRow(entry);
-
-                            return;
-                        }
-                    }
-                }
-
-                entryIt = null;
-
-                while (walIt.hasNext()) {
-                    IgniteBiTuple<WALPointer, WALRecord> rec = walIt.next();
-
-                    if (rec.get2() instanceof DataRecord) {
-                        DataRecord data = (DataRecord)rec.get2();
-
-                        entryIt = data.writeEntries().iterator();
-                        // Move on to the next valid data entry.
-
-                        break;
-                    }
-                }
-
-                if (entryIt == null)
-                    return;
-            }
-        }
-    }
-
-    /**
-     * Data entry row.
-     */
-    private static class DataEntryRow implements CacheDataRow {
-        /** */
-        private final DataEntry entry;
-
-        /**
-         * @param entry Data entry.
-         */
-        private DataEntryRow(DataEntry entry) {
-            this.entry = entry;
-        }
-
-        /** {@inheritDoc} */
-        @Override public KeyCacheObject key() {
-            return entry.key();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void key(KeyCacheObject key) {
-            throw new IllegalStateException();
-        }
-
-        /** {@inheritDoc} */
-        @Override public CacheObject value() {
-            return entry.value();
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridCacheVersion version() {
-            return entry.writeVersion();
-        }
-
-        /** {@inheritDoc} */
-        @Override public long expireTime() {
-            return entry.expireTime();
-        }
-
-        /** {@inheritDoc} */
-        @Override public int partition() {
-            return entry.partitionId();
-        }
-
-        /** {@inheritDoc} */
-        @Override public long link() {
-            return 0;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void link(long link) {
-            throw new UnsupportedOperationException();
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hash() {
-            return entry.key().hashCode();
-        }
-
-        /** {@inheritDoc} */
-        @Override public int cacheId() {
-            return entry.cacheId();
-        }
-    }
-
-    /**
-     *
-     */
-    private static class Metas {
-        /** */
-        @GridToStringInclude
-        private final RootPage reuseListRoot;
-
-        /** */
-        @GridToStringInclude
-        private final RootPage treeRoot;
-
-        /**
-         * @param treeRoot Metadata storage root.
-         * @param reuseListRoot Reuse list root.
-         */
-        Metas(RootPage treeRoot, RootPage reuseListRoot) {
-            this.treeRoot = treeRoot;
-            this.reuseListRoot = reuseListRoot;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(Metas.class, this);
-        }
-    }
-
-    /**
-     *
-     */
-    private class GridCacheDataStore implements CacheDataStore {
-        /** */
-        private final int partId;
-
-        /** */
-        private String name;
-
-        /** */
-        private volatile FreeListImpl freeList;
-
-        /** */
-        private volatile CacheDataStore delegate;
-
-        /** */
-        private final boolean exists;
-
-        /** */
-        private final AtomicBoolean init = new AtomicBoolean();
-
-        /** */
-        private final CountDownLatch latch = new CountDownLatch(1);
-
-        /**
-         * @param partId Partition.
-         * @param exists {@code True} if store for this index exists.
-         */
-        private GridCacheDataStore(int partId, boolean exists) {
-            this.partId = partId;
-            this.exists = exists;
-
-            name = treeName(partId);
-        }
-
-        /**
-         * @return Store delegate.
-         * @throws IgniteCheckedException If failed.
-         */
-        private CacheDataStore init0(boolean checkExists) throws IgniteCheckedException {
-            CacheDataStore delegate0 = delegate;
-
-            if (delegate0 != null)
-                return delegate0;
-
-            if (checkExists) {
-                if (!exists)
-                    return null;
-            }
-
-            IgniteCacheDatabaseSharedManager dbMgr = ctx.database();
-
-            dbMgr.checkpointReadLock();
-
-            if (init.compareAndSet(false, true)) {
-                try {
-                    Metas metas = getOrAllocatePartitionMetas();
-
-                    RootPage reuseRoot = metas.reuseListRoot;
-
-                    freeList = new FreeListImpl(
-                        grp.groupId(),
-                        grp.cacheOrGroupName() + "-" + partId,
-                        grp.memoryPolicy().memoryMetrics(),
-                        grp.memoryPolicy(),
-                        null,
-                        ctx.wal(),
-                        reuseRoot.pageId().pageId(),
-                        reuseRoot.isAllocated()) {
-                        @Override protected long allocatePageNoReuse() throws IgniteCheckedException {
-                            return pageMem.allocatePage(cacheId, partId, PageIdAllocator.FLAG_DATA);
-                        }
-                    };
-
-                    CacheDataRowStore rowStore = new CacheDataRowStore(grp, freeList, partId);
-
-                    RootPage treeRoot = metas.treeRoot;
-
-                    CacheDataTree dataTree = new CacheDataTree(
-                        grp,
-                        name,
-                        freeList,
-                        rowStore,
-                        treeRoot.pageId().pageId(),
-                        treeRoot.isAllocated()) {
-                        @Override protected long allocatePageNoReuse() throws IgniteCheckedException {
-                            return pageMem.allocatePage(cacheId, partId, PageIdAllocator.FLAG_DATA);
-                        }
-                    };
-
-                    PageMemoryEx pageMem = (PageMemoryEx)grp.memoryPolicy().pageMemory();
-
-                    delegate0 = new CacheDataStoreImpl(partId, name, rowStore, dataTree);
-
-                    int grpId = grp.groupId();
-                    long partMetaId = pageMem.partitionMetaPageId(grpId, partId);
-                    long partMetaPage = pageMem.acquirePage(grpId, partMetaId);
-
-                    try {
-                        long pageAddr = pageMem.readLock(grpId, partMetaId, partMetaPage);
-
-                        try {
-                            if (PageIO.getType(pageAddr) != 0) {
-                                PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.latest();
-
-                                Map<Integer, Long> cacheSizes = null;
-
-                                if (grp.sharedGroup()) {
-                                    long cntrsPageId = io.getCountersPageId(pageAddr);
-
-                                    if (cntrsPageId != 0L) {
-                                        cacheSizes = new HashMap<>();
-
-                                        long nextId = cntrsPageId;
-
-                                        while (true){
-                                            final long curId = nextId;
-                                            final long curPage = pageMem.acquirePage(grpId, curId);
-
-                                            try {
-                                                final long curAddr = pageMem.readLock(grpId, curId, curPage);
-
-                                                assert curAddr != 0;
-
-                                                try {
-                                                    PagePartitionCountersIO cntrsIO = PageIO.getPageIO(curAddr);
-
-                                                    if (cntrsIO.readCacheSizes(curAddr, cacheSizes))
-                                                        break;
-
-                                                    nextId = cntrsIO.getNextCountersPageId(curAddr);
-
-                                                    assert nextId != 0;
-                                                }
-                                                finally {
-                                                    pageMem.readUnlock(grpId, curId, curPage);
-                                                }
-                                            }
-                                            finally {
-                                                pageMem.releasePage(grpId, curId, curPage);
-                                            }
-                                        }
-                                    }
-                                }
-
-                                delegate0.init(io.getSize(pageAddr), io.getUpdateCounter(pageAddr), cacheSizes);
-
-                                globalRemoveId().setIfGreater(io.getGlobalRemoveId(pageAddr));
-                            }
-                        }
-                        finally {
-                            pageMem.readUnlock(grpId, partMetaId, partMetaPage);
-                        }
-                    }
-                    finally {
-                        pageMem.releasePage(grpId, partMetaId, partMetaPage);
-                    }
-
-                    delegate = delegate0;
-                }
-                finally {
-                    latch.countDown();
-
-                    dbMgr.checkpointReadUnlock();
-                }
-            }
-            else {
-                dbMgr.checkpointReadUnlock();
-
-                U.await(latch);
-
-                delegate0 = delegate;
-
-                if (delegate0 == null)
-                    throw new IgniteCheckedException("Cache store initialization failed.");
-            }
-
-            return delegate0;
-        }
-
-        /**
-         * @return Partition metas.
-         */
-        private Metas getOrAllocatePartitionMetas() throws IgniteCheckedException {
-            PageMemoryEx pageMem = (PageMemoryEx)grp.memoryPolicy().pageMemory();
-            IgniteWriteAheadLogManager wal = ctx.wal();
-
-            int grpId = grp.groupId();
-            long partMetaId = pageMem.partitionMetaPageId(grpId, partId);
-            long partMetaPage = pageMem.acquirePage(grpId, partMetaId);
-            try {
-                boolean allocated = false;
-                long pageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage);
-
-                try {
-                    long treeRoot, reuseListRoot;
-
-                    // Initialize new page.
-                    if (PageIO.getType(pageAddr) != PageIO.T_PART_META) {
-                        PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.latest();
-
-                        io.initNewPage(pageAddr, partMetaId, pageMem.pageSize());
-
-                        treeRoot = pageMem.allocatePage(grpId, partId, PageMemory.FLAG_DATA);
-                        reuseListRoot = pageMem.allocatePage(grpId, partId, PageMemory.FLAG_DATA);
-
-                        assert PageIdUtils.flag(treeRoot) == PageMemory.FLAG_DATA;
-                        assert PageIdUtils.flag(reuseListRoot) == PageMemory.FLAG_DATA;
-
-                        io.setTreeRoot(pageAddr, treeRoot);
-                        io.setReuseListRoot(pageAddr, reuseListRoot);
-
-                        if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, partMetaId, partMetaPage, wal, null))
-                            wal.log(new MetaPageInitRecord(
-                                grpId,
-                                partMetaId,
-                                io.getType(),
-                                io.getVersion(),
-                                treeRoot,
-                                reuseListRoot
-                            ));
-
-                        allocated = true;
-                    }
-                    else {
-                        PagePartitionMetaIO io = PageIO.getPageIO(pageAddr);
-
-                        treeRoot = io.getTreeRoot(pageAddr);
-                        reuseListRoot = io.getReuseListRoot(pageAddr);
-
-                        assert PageIdUtils.flag(treeRoot) == PageMemory.FLAG_DATA :
-                            U.hexLong(treeRoot) + ", part=" + partId + ", grpId=" + grpId;
-                        assert PageIdUtils.flag(reuseListRoot) == PageMemory.FLAG_DATA :
-                            U.hexLong(reuseListRoot) + ", part=" + partId + ", grpId=" + grpId;
-                    }
-
-                    return new Metas(
-                        new RootPage(new FullPageId(treeRoot, grpId), allocated),
-                        new RootPage(new FullPageId(reuseListRoot, grpId), allocated));
-                }
-                finally {
-                    pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, allocated);
-                }
-            }
-            finally {
-                pageMem.releasePage(grpId, partMetaId, partMetaPage);
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public int partId() {
-            return partId;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String name() {
-            return name;
-        }
-
-        /** {@inheritDoc} */
-        @Override public RowStore rowStore() {
-            CacheDataStore delegate0 = delegate;
-
-            return delegate0 == null ? null : delegate0.rowStore();
-        }
-
-        /** {@inheritDoc} */
-        @Override public int fullSize() {
-            try {
-                CacheDataStore delegate0 = init0(true);
-
-                return delegate0 == null ? 0 : delegate0.fullSize();
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public int cacheSize(int cacheId) {
-            try {
-                CacheDataStore delegate0 = init0(true);
-
-                return delegate0 == null ? 0 : delegate0.cacheSize(cacheId);
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public Map<Integer, Long> cacheSizes() {
-            try {
-                CacheDataStore delegate0 = init0(true);
-
-                return delegate0 == null ? null : delegate0.cacheSizes();
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public long updateCounter() {
-            try {
-                CacheDataStore delegate0 = init0(true);
-
-                return delegate0 == null ? 0 : delegate0.updateCounter();
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public void init(long size, long updCntr, @Nullable Map<Integer, Long> cacheSizes) {
-            throw new IllegalStateException("Should be never called.");
-        }
-
-        /** {@inheritDoc} */
-        @Override public void updateCounter(long val) {
-            try {
-                CacheDataStore delegate0 = init0(false);
-
-                if (delegate0 != null)
-                    delegate0.updateCounter(val);
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public long nextUpdateCounter() {
-            try {
-                CacheDataStore delegate0 = init0(false);
-
-                return delegate0 == null ? 0 : delegate0.nextUpdateCounter();
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public Long initialUpdateCounter() {
-            try {
-                CacheDataStore delegate0 = init0(true);
-
-                return delegate0 == null ? 0 : delegate0.initialUpdateCounter();
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public void updateInitialCounter(long cntr) {
-            try {
-                CacheDataStore delegate0 = init0(true);
-
-                if (delegate0 != null)
-                    delegate0.updateInitialCounter(cntr);
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public void update(
-            GridCacheContext cctx,
-            KeyCacheObject key,
-            CacheObject val,
-            GridCacheVersion ver,
-            long expireTime,
-            @Nullable CacheDataRow oldRow
-        ) throws IgniteCheckedException {
-            CacheDataStore delegate = init0(false);
-
-            delegate.update(cctx, key, val, ver, expireTime, oldRow);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void updateIndexes(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException {
-            CacheDataStore delegate = init0(false);
-
-            delegate.updateIndexes(cctx, key);
-        }
-
-        /** {@inheritDoc} */
-        @Override public CacheDataRow createRow(
-            GridCacheContext cctx,
-            KeyCacheObject key,
-            CacheObject val,
-            GridCacheVersion ver,
-            long expireTime,
-            @Nullable CacheDataRow oldRow) throws IgniteCheckedException {
-            CacheDataStore delegate = init0(false);
-
-            return delegate.createRow(cctx, key, val, ver, expireTime, oldRow);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void invoke(GridCacheContext cctx, KeyCacheObject key, OffheapInvokeClosure c)
-            throws IgniteCheckedException {
-            CacheDataStore delegate = init0(false);
-
-            delegate.invoke(cctx, key, c);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void remove(GridCacheContext cctx, KeyCacheObject key, int partId)
-            throws IgniteCheckedException {
-            CacheDataStore delegate = init0(false);
-
-            delegate.remove(cctx, key, partId);
-        }
-
-        /** {@inheritDoc} */
-        @Override public CacheDataRow find(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException {
-            CacheDataStore delegate = init0(true);
-
-            if (delegate != null)
-                return delegate.find(cctx, key);
-
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridCursor<? extends CacheDataRow> cursor() throws IgniteCheckedException {
-            CacheDataStore delegate = init0(true);
-
-            if (delegate != null)
-                return delegate.cursor();
-
-            return EMPTY_CURSOR;
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridCursor<? extends CacheDataRow> cursor(
-            int cacheId,
-            KeyCacheObject lower,
-            KeyCacheObject upper) throws IgniteCheckedException {
-            CacheDataStore delegate = init0(true);
-
-            if (delegate != null)
-                return delegate.cursor(cacheId, lower, upper);
-
-            return EMPTY_CURSOR;
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId,
-            KeyCacheObject lower,
-            KeyCacheObject upper,
-            Object x)
-            throws IgniteCheckedException {
-            CacheDataStore delegate = init0(true);
-
-            if (delegate != null)
-                return delegate.cursor(cacheId, lower, upper, x);
-
-            return EMPTY_CURSOR;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void destroy() throws IgniteCheckedException {
-            // No need to destroy delegate.
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId) throws IgniteCheckedException {
-            CacheDataStore delegate = init0(true);
-
-            if (delegate != null)
-                return delegate.cursor(cacheId);
-
-            return EMPTY_CURSOR;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void clear(int cacheId) throws IgniteCheckedException {
-            CacheDataStore delegate = init0(true);
-
-            if (delegate != null)
-                delegate.clear(cacheId);
-        }
-    }
-
-    /**
-     *
-     */
-    private static final GridCursor<CacheDataRow> EMPTY_CURSOR = new GridCursor<CacheDataRow>() {
-        /** {@inheritDoc} */
-        @Override public boolean next() {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public CacheDataRow get() {
-            return null;
-        }
-    };
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6bf5ce46/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/PersistenceMetricsImpl.java
----------------------------------------------------------------------
diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/PersistenceMetricsImpl.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/PersistenceMetricsImpl.java
deleted file mode 100644
index 25d95ae..0000000
--- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/PersistenceMetricsImpl.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ignite.internal.processors.cache.database;
-
-import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
-import org.apache.ignite.internal.processors.cache.ratemetrics.HitRateMetrics;
-import org.apache.ignite.mxbean.PersistenceMetricsMXBean;
-
-/**
- *
- */
-public class PersistenceMetricsImpl implements PersistenceMetricsMXBean {
-    /** */
-    private volatile HitRateMetrics walLoggingRate;
-
-    /** */
-    private volatile HitRateMetrics walWritingRate;
-
-    /** */
-    private volatile HitRateMetrics walFsyncTimeDuration;
-
-    /** */
-    private volatile HitRateMetrics walFsyncTimeNumber;
-
-    /** */
-    private volatile long lastCpLockWaitDuration;
-
-    /** */
-    private volatile long lastCpMarkDuration;
-
-    /** */
-    private volatile long lastCpPagesWriteDuration;
-
-    /** */
-    private volatile long lastCpDuration;
-
-    /** */
-    private volatile long lastCpFsyncDuration;
-
-    /** */
-    private volatile long lastCpTotalPages;
-
-    /** */
-    private volatile long lastCpDataPages;
-
-    /** */
-    private volatile long lastCpCowPages;
-
-    /** */
-    private volatile long rateTimeInterval;
-
-    /** */
-    private volatile int subInts;
-
-    /** */
-    private volatile boolean metricsEnabled;
-
-    /** */
-    private IgniteWriteAheadLogManager wal;
-
-    /**
-     * @param metricsEnabled Metrics enabled flag.
-     * @param rateTimeInterval Rate time interval.
-     * @param subInts Number of sub-intervals.
-     */
-    public PersistenceMetricsImpl(
-        boolean metricsEnabled,
-        long rateTimeInterval,
-        int subInts
-    ) {
-        this.metricsEnabled = metricsEnabled;
-        this.rateTimeInterval = rateTimeInterval;
-        this.subInts = subInts;
-
-        resetRates();
-    }
-
-    /** {@inheritDoc} */
-    @Override public float getWalLoggingRate() {
-        if (!metricsEnabled)
-            return 0;
-
-        return ((float)walLoggingRate.getRate()) / rateTimeInterval;
-    }
-
-    /** {@inheritDoc} */
-    @Override public float getWalWritingRate() {
-        if (!metricsEnabled)
-            return 0;
-
-        return ((float)walWritingRate.getRate()) / rateTimeInterval;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int getWalArchiveSegments() {
-        if (!metricsEnabled)
-            return 0;
-
-        return wal.walArchiveSegments();
-    }
-
-    /** {@inheritDoc} */
-    @Override public float getWalFsyncTimeAverage() {
-        if (!metricsEnabled)
-            return 0;
-
-        long numRate = walFsyncTimeNumber.getRate();
-
-        if (numRate == 0)
-            return 0;
-
-        return (float)walFsyncTimeDuration.getRate() / numRate;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getLastCheckpointingDuration() {
-        if (!metricsEnabled)
-            return 0;
-
-        return lastCpDuration;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getLastCheckpointLockWaitDuration() {
-        if (!metricsEnabled)
-            return 0;
-
-        return lastCpLockWaitDuration;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getLastCheckpointMarkDuration() {
-        if (!metricsEnabled)
-            return 0;
-
-        return lastCpMarkDuration;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getLastCheckpointPagesWriteDuration() {
-        if (!metricsEnabled)
-            return 0;
-
-        return lastCpPagesWriteDuration;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getLastCheckpointFsyncDuration() {
-        if (!metricsEnabled)
-            return 0;
-
-        return lastCpFsyncDuration;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getLastCheckpointTotalPagesNumber() {
-        if (!metricsEnabled)
-            return 0;
-
-        return lastCpTotalPages;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getLastCheckpointDataPagesNumber() {
-        if (!metricsEnabled)
-            return 0;
-
-        return lastCpDataPages;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getLastCheckpointCopiedOnWritePagesNumber() {
-        if (!metricsEnabled)
-            return 0;
-
-        return lastCpCowPages;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void enableMetrics() {
-        metricsEnabled = true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void disableMetrics() {
-        metricsEnabled = false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void rateTimeInterval(long rateTimeInterval) {
-        this.rateTimeInterval = rateTimeInterval;
-
-        resetRates();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void subIntervals(int subInts) {
-        this.subInts = subInts;
-
-        resetRates();
-    }
-
-    /**
-     * @param wal Write-ahead log manager.
-     */
-    public void wal(IgniteWriteAheadLogManager wal) {
-        this.wal = wal;
-    }
-
-    /**
-     * @return Metrics enabled flag.
-     */
-    public boolean metricsEnabled() {
-        return metricsEnabled;
-    }
-
-    /**
-     * @param lockWaitDuration Lock wait duration.
-     * @param markDuration Mark duration.
-     * @param pagesWriteDuration Pages write duration.
-     * @param fsyncDuration Total checkpoint fsync duration.
-     * @param duration Total checkpoint duration.
-     * @param totalPages Total number of all pages in checkpoint.
-     * @param dataPages Total number of data pages in checkpoint.
-     * @param cowPages Total number of COW-ed pages in checkpoint.
-     */
-    public void onCheckpoint(
-        long lockWaitDuration,
-        long markDuration,
-        long pagesWriteDuration,
-        long fsyncDuration,
-        long duration,
-        long totalPages,
-        long dataPages,
-        long cowPages
-    ) {
-        if (metricsEnabled) {
-            lastCpLockWaitDuration = lockWaitDuration;
-            lastCpMarkDuration = markDuration;
-            lastCpPagesWriteDuration = pagesWriteDuration;
-            lastCpFsyncDuration = fsyncDuration;
-            lastCpDuration = duration;
-            lastCpTotalPages = totalPages;
-            lastCpDataPages = dataPages;
-            lastCpCowPages = cowPages;
-        }
-    }
-
-    /**
-     *
-     */
-    public void onWalRecordLogged() {
-        walLoggingRate.onHit();
-    }
-
-    /**
-     * @param size Size written.
-     */
-    public void onWalBytesWritten(int size) {
-        walWritingRate.onHits(size);
-    }
-
-    /**
-     * @param nanoTime Fsync nano time.
-     */
-    public void onFsync(long nanoTime) {
-        long microseconds = nanoTime / 1_000;
-
-        walFsyncTimeDuration.onHits(microseconds);
-        walFsyncTimeNumber.onHit();
-    }
-
-    /**
-     *
-     */
-    private void resetRates() {
-        walLoggingRate = new HitRateMetrics((int)rateTimeInterval, subInts);
-        walWritingRate = new HitRateMetrics((int)rateTimeInterval, subInts);
-
-        walFsyncTimeDuration = new HitRateMetrics((int)rateTimeInterval, subInts);
-        walFsyncTimeNumber = new HitRateMetrics((int)rateTimeInterval, subInts);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6bf5ce46/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/file/FilePageStore.java
----------------------------------------------------------------------
diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/file/FilePageStore.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/file/FilePageStore.java
deleted file mode 100755
index 2042358..0000000
--- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/file/FilePageStore.java
+++ /dev/null
@@ -1,529 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.database.file;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.channels.FileChannel;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteSystemProperties;
-import org.apache.ignite.configuration.MemoryConfiguration;
-import org.apache.ignite.internal.pagemem.PageIdUtils;
-import org.apache.ignite.internal.pagemem.store.PageStore;
-import org.apache.ignite.internal.processors.cache.database.tree.io.PageIO;
-import org.apache.ignite.internal.processors.cache.database.wal.crc.IgniteDataIntegrityViolationException;
-import org.apache.ignite.internal.processors.cache.database.wal.crc.PureJavaCrc32;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC;
-
-/**
- * File page store.
- */
-public class FilePageStore implements PageStore {
-    /** Page store file signature. */
-    private static final long SIGNATURE = 0xF19AC4FE60C530B8L;
-
-    /** File version. */
-    private static final int VERSION = 1;
-
-    /** Allocated field offset. */
-    public static final int HEADER_SIZE = 8/*SIGNATURE*/ + 4/*VERSION*/ + 1/*type*/ + 4/*page size*/;
-
-    /** */
-    private final File cfgFile;
-
-    /** */
-    private final byte type;
-
-    /** Database configuration. */
-    private final MemoryConfiguration dbCfg;
-
-    /** */
-    private RandomAccessFile file;
-
-    /** */
-    private FileChannel ch;
-
-    /** */
-    private final AtomicLong allocated;
-
-    /** */
-    private final int pageSize;
-
-    /** */
-    private volatile boolean inited;
-
-    /** */
-    private volatile boolean recover;
-
-    /** */
-    private volatile int tag;
-
-    /** */
-    private boolean skipCrc = IgniteSystemProperties.getBoolean(IGNITE_PDS_SKIP_CRC, false);
-
-    /** */
-    private final ReadWriteLock lock = new ReentrantReadWriteLock();
-
-    /**
-     * @param file File.
-     */
-    public FilePageStore(byte type, File file, MemoryConfiguration cfg) {
-        this.type = type;
-
-        cfgFile = file;
-        dbCfg = cfg;
-
-        allocated = new AtomicLong();
-
-        pageSize = dbCfg.getPageSize();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean exists() {
-        return cfgFile.exists() && cfgFile.length() > HEADER_SIZE;
-    }
-
-    /**
-     * @param type Type.
-     * @param pageSize Page size.
-     * @return Byte buffer instance.
-     */
-    public static ByteBuffer header(byte type, int pageSize) {
-        ByteBuffer hdr = ByteBuffer.allocate(HEADER_SIZE).order(ByteOrder.LITTLE_ENDIAN);
-
-        hdr.putLong(SIGNATURE);
-
-        hdr.putInt(VERSION);
-
-        hdr.put(type);
-
-        hdr.putInt(pageSize);
-
-        hdr.rewind();
-
-        return hdr;
-    }
-
-    /**
-     *
-     */
-    private long initFile() {
-        try {
-            ByteBuffer hdr = header(type, dbCfg.getPageSize());
-
-            while (hdr.remaining() > 0)
-                ch.write(hdr);
-        }
-        catch (IOException e) {
-            throw new IgniteException("Check file failed.", e);
-        }
-
-        //there is 'super' page in every file
-        return HEADER_SIZE + dbCfg.getPageSize();
-    }
-
-    /**
-     *
-     */
-    private long checkFile() throws IgniteCheckedException {
-        try {
-            ByteBuffer hdr = ByteBuffer.allocate(HEADER_SIZE).order(ByteOrder.LITTLE_ENDIAN);
-
-            while (hdr.remaining() > 0)
-                ch.read(hdr);
-
-            hdr.rewind();
-
-            long signature = hdr.getLong();
-
-            if (SIGNATURE != signature)
-                throw new IgniteCheckedException("Failed to verify store file (invalid file signature)" +
-                    " [expectedSignature=" + U.hexLong(SIGNATURE) +
-                    ", actualSignature=" + U.hexLong(signature) + ']');
-
-            int ver = hdr.getInt();
-
-            if (VERSION != ver)
-                throw new IgniteCheckedException("Failed to verify store file (invalid file version)" +
-                    " [expectedVersion=" + VERSION +
-                    ", fileVersion=" + ver + "]");
-
-            byte type = hdr.get();
-
-            if (this.type != type)
-                throw new IgniteCheckedException("Failed to verify store file (invalid file type)" +
-                    " [expectedFileType=" + this.type +
-                    ", actualFileType=" + type + "]");
-
-            int pageSize = hdr.getInt();
-
-            if (dbCfg.getPageSize() != pageSize)
-                throw new IgniteCheckedException("Failed to verify store file (invalid page size)" +
-                    " [expectedPageSize=" + dbCfg.getPageSize() +
-                    ", filePageSize=" + pageSize + "]");
-
-            long fileSize = file.length();
-
-            if (fileSize == HEADER_SIZE) // Every file has a special meta page.
-                fileSize = pageSize + HEADER_SIZE;
-
-            if ((fileSize - HEADER_SIZE) % pageSize != 0)
-                throw new IgniteCheckedException("Failed to verify store file (invalid file size)" +
-                    " [fileSize=" + U.hexLong(fileSize) +
-                    ", pageSize=" + U.hexLong(pageSize) + ']');
-
-            return fileSize;
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException("File check failed", e);
-        }
-    }
-
-    /**
-     * @param cleanFile {@code True} to delete file.
-     * @throws IgniteCheckedException If failed.
-     */
-    public void stop(boolean cleanFile) throws IgniteCheckedException {
-        lock.writeLock().lock();
-
-        try {
-            if (!inited)
-                return;
-
-            ch.force(false);
-
-            file.close();
-
-            if (cleanFile)
-                cfgFile.delete();
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
-        }
-        finally {
-            lock.writeLock().unlock();
-        }
-    }
-
-    /**
-     *
-     */
-    public void truncate(int tag) throws IgniteCheckedException {
-        lock.writeLock().lock();
-
-        try {
-            if (!inited)
-                return;
-
-            this.tag = tag;
-
-            ch.position(0);
-
-            file.setLength(0);
-
-            allocated.set(initFile());
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
-        }
-        finally {
-            lock.writeLock().unlock();
-        }
-    }
-
-    /**
-     *
-     */
-    public void beginRecover() {
-        lock.writeLock().lock();
-
-        try {
-            recover = true;
-        }
-        finally {
-            lock.writeLock().unlock();
-        }
-    }
-
-    /**
-     *
-     */
-    public void finishRecover() {
-        lock.writeLock().lock();
-
-        try {
-            if (inited)
-                allocated.set(ch.size());
-
-            recover = false;
-        }
-        catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-        finally {
-            lock.writeLock().unlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void read(long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteCheckedException {
-        init();
-
-        try {
-            long off = pageOffset(pageId);
-
-            assert pageBuf.capacity() == pageSize;
-            assert pageBuf.position() == 0;
-            assert pageBuf.order() == ByteOrder.nativeOrder();
-
-            int len = pageSize;
-
-            do {
-                int n = ch.read(pageBuf, off);
-
-                // If page was not written yet, nothing to read.
-                if (n < 0) {
-                    pageBuf.put(new byte[pageBuf.remaining()]);
-
-                    return;
-                }
-
-                off += n;
-
-                len -= n;
-            }
-            while (len > 0);
-
-            int savedCrc32 = PageIO.getCrc(pageBuf);
-
-            PageIO.setCrc(pageBuf, 0);
-
-            pageBuf.position(0);
-
-            if (!skipCrc) {
-                int curCrc32 = PureJavaCrc32.calcCrc32(pageBuf, pageSize);
-
-                if ((savedCrc32 ^ curCrc32) != 0)
-                    throw new IgniteDataIntegrityViolationException("Failed to read page (CRC validation failed) " +
-                        "[id=" + U.hexLong(pageId) + ", off=" + (off - pageSize) +
-                        ", file=" + cfgFile.getAbsolutePath() + ", fileSize=" + ch.size() +
-                        ", savedCrc=" + U.hexInt(savedCrc32) + ", curCrc=" + U.hexInt(curCrc32) + "]");
-            }
-
-            assert PageIO.getCrc(pageBuf) == 0;
-
-            if (keepCrc)
-                PageIO.setCrc(pageBuf, savedCrc32);
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException("Read error", e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readHeader(ByteBuffer buf) throws IgniteCheckedException {
-        init();
-
-        try {
-            assert buf.remaining() == HEADER_SIZE;
-
-            int len = HEADER_SIZE;
-
-            long off = 0;
-
-            do {
-                int n = ch.read(buf, off);
-
-                // If page was not written yet, nothing to read.
-                if (n < 0)
-                    return;
-
-                off += n;
-
-                len -= n;
-            }
-            while (len > 0);
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException("Read error", e);
-        }
-    }
-
-    /**
-     * @throws IgniteCheckedException If failed to initialize store file.
-     */
-    private void init() throws IgniteCheckedException {
-        if (!inited) {
-            lock.writeLock().lock();
-
-            try {
-                if (!inited) {
-                    RandomAccessFile rndFile = null;
-
-                    IgniteCheckedException err = null;
-
-                    try {
-                        file = rndFile = new RandomAccessFile(cfgFile, "rw");
-
-                        ch = file.getChannel();
-
-                        if (file.length() == 0)
-                            allocated.set(initFile());
-                        else
-                            allocated.set(checkFile());
-
-                        inited = true;
-                    }
-                    catch (IOException e) {
-                        throw err = new IgniteCheckedException("Can't open file: " + cfgFile.getName(), e);
-                    }
-                    finally {
-                        if (err != null && rndFile != null)
-                            try {
-                                rndFile.close();
-                            }
-                            catch (IOException e) {
-                                err.addSuppressed(e);
-                            }
-                    }
-                }
-            }
-            finally {
-                lock.writeLock().unlock();
-            }
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void write(long pageId, ByteBuffer pageBuf, int tag) throws IgniteCheckedException {
-        init();
-
-        lock.readLock().lock();
-
-        try {
-            if (tag < this.tag)
-                return;
-
-            long off = pageOffset(pageId);
-
-            assert (off >= 0 && off + pageSize <= allocated.get() + HEADER_SIZE) || recover :
-                "off=" + U.hexLong(off) + ", allocated=" + U.hexLong(allocated.get()) + ", pageId=" + U.hexLong(pageId);
-
-            assert pageBuf.capacity() == pageSize;
-            assert pageBuf.position() == 0;
-            assert pageBuf.order() == ByteOrder.nativeOrder();
-            assert PageIO.getCrc(pageBuf) == 0 : U.hexLong(pageId);
-
-            int crc32 = skipCrc ? 0 : PureJavaCrc32.calcCrc32(pageBuf, pageSize);
-
-            PageIO.setCrc(pageBuf, crc32);
-
-            pageBuf.position(0);
-
-            int len = pageSize;
-
-            do {
-                int n = ch.write(pageBuf, off);
-
-                off += n;
-
-                len -= n;
-            }
-            while (len > 0);
-
-            PageIO.setCrc(pageBuf, 0);
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException("Failed to write the page to the file store [pageId=" + pageId +
-                ", file=" + cfgFile.getAbsolutePath() + ']', e);
-        }
-        finally {
-            lock.readLock().unlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public long pageOffset(long pageId) {
-        return (long) PageIdUtils.pageIndex(pageId) * pageSize + HEADER_SIZE;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void sync() throws IgniteCheckedException {
-        lock.writeLock().lock();
-
-        try {
-            init();
-
-            ch.force(false);
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException("Sync error", e);
-        }
-        finally {
-            lock.writeLock().unlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void ensure() throws IgniteCheckedException {
-        init();
-    }
-
-    /** {@inheritDoc} */
-    @Override public long allocatePage() throws IgniteCheckedException {
-        init();
-
-        long off = allocPage();
-
-        return off / pageSize;
-    }
-
-    /**
-     *
-     */
-    private long allocPage() {
-        long off;
-
-        do {
-            off = allocated.get();
-
-            if (allocated.compareAndSet(off, off + pageSize))
-                break;
-        }
-        while (true);
-
-        return off;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int pages() {
-        if (!inited)
-            return 0;
-
-        return (int)(allocated.get() / pageSize);
-    }
-}


Mime
View raw message