Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3DD0F200CCB for ; Sun, 11 Jun 2017 22:03:34 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3C736160BD8; Sun, 11 Jun 2017 20:03:34 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4627F160BF2 for ; Sun, 11 Jun 2017 22:03:31 +0200 (CEST) Received: (qmail 60564 invoked by uid 500); 11 Jun 2017 20:03:30 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 59955 invoked by uid 99); 11 Jun 2017 20:03:30 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 11 Jun 2017 20:03:30 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B8CC4F4A5F; Sun, 11 Jun 2017 20:03:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: agoncharuk@apache.org To: commits@ignite.apache.org Date: Sun, 11 Jun 2017 20:03:41 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [15/47] ignite git commit: IGNITE-5267 - Moved ignite-ps module to ignite-core archived-at: Sun, 11 Jun 2017 20:03:34 -0000 http://git-wip-us.apache.org/repos/asf/ignite/blob/6bf5ce46/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheOffheapManager.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheOffheapManager.java deleted file mode 100644 index c0176f6..0000000 --- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheOffheapManager.java +++ /dev/null @@ -1,1316 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.database; - -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; -import org.apache.ignite.internal.pagemem.FullPageId; -import org.apache.ignite.internal.pagemem.PageIdAllocator; -import org.apache.ignite.internal.pagemem.PageIdUtils; -import org.apache.ignite.internal.pagemem.PageMemory; -import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; -import org.apache.ignite.internal.pagemem.wal.WALIterator; -import org.apache.ignite.internal.pagemem.wal.WALPointer; -import org.apache.ignite.internal.pagemem.wal.record.DataEntry; -import org.apache.ignite.internal.pagemem.wal.record.DataRecord; -import org.apache.ignite.internal.pagemem.wal.record.WALRecord; -import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageInitRecord; -import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateNextSnapshotId; -import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdatePartitionDataRecord; -import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionDestroyRecord; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.CacheGroupContext; -import org.apache.ignite.internal.processors.cache.CacheObject; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl; -import org.apache.ignite.internal.processors.cache.IgniteRebalanceIterator; -import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.database.freelist.FreeListImpl; -import org.apache.ignite.internal.processors.cache.database.pagemem.PageMemoryEx; -import org.apache.ignite.internal.processors.cache.database.tree.io.PageIO; -import org.apache.ignite.internal.processors.cache.database.tree.io.PageMetaIO; -import org.apache.ignite.internal.processors.cache.database.tree.io.PagePartitionCountersIO; -import org.apache.ignite.internal.processors.cache.database.tree.io.PagePartitionMetaIO; -import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList; -import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseListImpl; -import org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; -import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.GridUnsafe; -import org.apache.ignite.internal.util.lang.GridCursor; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.T2; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteBiTuple; -import org.jetbrains.annotations.Nullable; - -/** - * Used when persistence enabled. - */ -public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl implements DbCheckpointListener { - /** */ - private MetaStore metaStore; - - /** */ - private ReuseListImpl reuseList; - - /** {@inheritDoc} */ - @Override protected void initDataStructures() throws IgniteCheckedException { - Metas metas = getOrAllocateCacheMetas(); - - RootPage reuseListRoot = metas.reuseListRoot; - - reuseList = new ReuseListImpl(grp.groupId(), - grp.cacheOrGroupName(), - grp.memoryPolicy().pageMemory(), - ctx.wal(), - reuseListRoot.pageId().pageId(), - reuseListRoot.isAllocated()); - - RootPage metastoreRoot = metas.treeRoot; - - metaStore = new MetadataStorage(grp.memoryPolicy().pageMemory(), - ctx.wal(), - globalRemoveId(), - grp.groupId(), - PageIdAllocator.INDEX_PARTITION, - PageIdAllocator.FLAG_IDX, - reuseList, - metastoreRoot.pageId().pageId(), - metastoreRoot.isAllocated()); - - ((GridCacheDatabaseSharedManager)ctx.database()).addCheckpointListener(this); - } - - /** {@inheritDoc} */ - @Override public void onCacheStarted(GridCacheContext cctx) throws IgniteCheckedException { - if (cctx.affinityNode() && cctx.ttl().eagerTtlEnabled() && pendingEntries == null) { - ctx.database().checkpointReadLock(); - - try { - final String name = "PendingEntries"; - - RootPage pendingRootPage = metaStore.getOrAllocateForTree(name); - - pendingEntries = new PendingEntriesTree( - grp, - name, - grp.memoryPolicy().pageMemory(), - pendingRootPage.pageId().pageId(), - reuseList, - pendingRootPage.isAllocated() - ); - } - finally { - ctx.database().checkpointReadUnlock(); - } - } - } - - /** {@inheritDoc} */ - @Override protected CacheDataStore createCacheDataStore0(final int p) - throws IgniteCheckedException { - GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)ctx.database(); - - boolean exists = ctx.pageStore() != null - && ctx.pageStore().exists(grp.groupId(), p); - - return new GridCacheDataStore(p, exists); - } - - /** {@inheritDoc} */ - @Override public void onCheckpointBegin(Context ctx) throws IgniteCheckedException { - assert grp.memoryPolicy().pageMemory() instanceof PageMemoryEx; - - reuseList.saveMetadata(); - - boolean metaWasUpdated = false; - - for (CacheDataStore store : partDataStores.values()) { - RowStore rowStore = store.rowStore(); - - if (rowStore == null) - continue; - - metaWasUpdated |= saveStoreMetadata(store, ctx, !metaWasUpdated, false); - } - } - - /** - * @param store Store to save metadata. - * @throws IgniteCheckedException If failed. - */ - private boolean saveStoreMetadata(CacheDataStore store, Context ctx, boolean saveMeta, - boolean beforeDestroy) throws IgniteCheckedException { - RowStore rowStore0 = store.rowStore(); - - boolean beforeSnapshot = ctx != null && ctx.nextSnapshot(); - - boolean wasSaveToMeta = false; - - if (rowStore0 != null) { - FreeListImpl freeList = (FreeListImpl)rowStore0.freeList(); - - freeList.saveMetadata(); - - long updCntr = store.updateCounter(); - int size = store.fullSize(); - long rmvId = globalRemoveId().get(); - - PageMemoryEx pageMem = (PageMemoryEx)grp.memoryPolicy().pageMemory(); - IgniteWriteAheadLogManager wal = this.ctx.wal(); - - if (size > 0 || updCntr > 0) { - int state = -1; - - if (beforeDestroy) - state = GridDhtPartitionState.EVICTED.ordinal(); - else { - // localPartition will not acquire writeLock here because create=false. - GridDhtLocalPartition part = grp.topology().localPartition(store.partId(), - AffinityTopologyVersion.NONE, false); - - if (part != null && part.state() != GridDhtPartitionState.EVICTED) - state = part.state().ordinal(); - } - - // Do not save meta for evicted partitions on next checkpoints. - if (state == -1) - return false; - - int grpId = grp.groupId(); - long partMetaId = pageMem.partitionMetaPageId(grpId, store.partId()); - long partMetaPage = pageMem.acquirePage(grpId, partMetaId); - - try { - long pageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage); - - if (pageAddr == 0L) { - U.warn(log, "Failed to acquire write lock for meta page [metaPage=" + partMetaPage + - ", saveMeta=" + saveMeta + ", beforeDestroy=" + beforeDestroy + ", size=" + size + - ", updCntr=" + updCntr + ", state=" + state + ']'); - - return false; - } - - boolean changed = false; - - try { - PagePartitionMetaIO io = PageIO.getPageIO(pageAddr); - - changed |= io.setUpdateCounter(pageAddr, updCntr); - changed |= io.setGlobalRemoveId(pageAddr, rmvId); - changed |= io.setSize(pageAddr, size); - - changed |= io.setPartitionState(pageAddr, (byte)state); - - long cntrsPageId; - - if (grp.sharedGroup()) { - cntrsPageId = io.getCountersPageId(pageAddr); - - byte[] data = serializeCacheSizes(store.cacheSizes()); - - int items = data.length / 12; - int written = 0; - int pageSize = pageMem.pageSize(); - - boolean init = cntrsPageId == 0; - - if (init) { - cntrsPageId = pageMem.allocatePage(grpId, store.partId(), PageIdAllocator.FLAG_DATA); - - io.setCountersPageId(pageAddr, cntrsPageId); - - changed = true; - } - - long nextId = cntrsPageId; - - while (written != items) { - final long curId = nextId; - final long curPage = pageMem.acquirePage(grpId, curId); - - try { - final long curAddr = pageMem.writeLock(grpId, curId, curPage); - - assert curAddr != 0; - - try { - PagePartitionCountersIO partMetaIo; - - if (init) { - partMetaIo = PagePartitionCountersIO.VERSIONS.latest(); - - partMetaIo.initNewPage(curAddr, curId, pageSize); - } - else - partMetaIo = PageIO.getPageIO(curAddr); - - written += partMetaIo.writeCacheSizes(pageSize, curAddr, data, written); - - nextId = partMetaIo.getNextCountersPageId(curAddr); - - if(written != items && (init = nextId == 0)) { - //allocate new counters page - nextId = pageMem.allocatePage(grpId, store.partId(), PageIdAllocator.FLAG_DATA); - partMetaIo.setNextCountersPageId(curAddr, nextId); - } - } - finally { - // Write full page - pageMem.writeUnlock(grpId, curId, curPage, Boolean.TRUE, true); - } - } - finally { - pageMem.releasePage(grpId, curId, curPage); - } - } - } - else - cntrsPageId = 0L; - - int pageCnt; - - if (beforeSnapshot) { - pageCnt = this.ctx.pageStore().pages(grpId, store.partId()); - io.setCandidatePageCount(pageAddr, pageCnt); - - if (saveMeta) { - long metaPageId = pageMem.metaPageId(grpId); - long metaPage = pageMem.acquirePage(grpId, metaPageId); - - try { - long metaPageAddr = pageMem.writeLock(grpId, metaPageId, metaPage); - - try { - long nextSnapshotTag = io.getNextSnapshotTag(metaPageAddr); - - io.setNextSnapshotTag(metaPageAddr, nextSnapshotTag + 1); - - if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, metaPageId, - metaPage, wal, null)) - wal.log(new MetaPageUpdateNextSnapshotId(grpId, metaPageId, - nextSnapshotTag + 1)); - - addPartition(ctx.partitionStatMap(), metaPageAddr, io, grpId, PageIdAllocator.INDEX_PARTITION, - this.ctx.kernalContext().cache().context().pageStore().pages(grpId, PageIdAllocator.INDEX_PARTITION)); - } - finally { - pageMem.writeUnlock(grpId, metaPageId, metaPage, null, true); - } - } - finally { - pageMem.releasePage(grpId, metaPageId, metaPage); - } - - wasSaveToMeta = true; - } - - GridDhtPartitionMap partMap = grp.topology().localPartitionMap(); - - if (partMap.containsKey(store.partId()) && - partMap.get(store.partId()) == GridDhtPartitionState.OWNING) - addPartition(ctx.partitionStatMap(), pageAddr, io, grpId, store.partId(), - this.ctx.pageStore().pages(grpId, store.partId())); - - changed = true; - } - else - pageCnt = io.getCandidatePageCount(pageAddr); - - if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, partMetaId, partMetaPage, wal, null)) - wal.log(new MetaPageUpdatePartitionDataRecord( - grpId, - partMetaId, - updCntr, - rmvId, - size, - cntrsPageId, - (byte)state, - pageCnt - )); - } - finally { - pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, changed); - } - } - finally { - pageMem.releasePage(grpId, partMetaId, partMetaPage); - } - } - } - - return wasSaveToMeta; - } - - /** - * @param cacheSizes Cache sizes. - * @return Serialized cache sizes - */ - private byte[] serializeCacheSizes(Map cacheSizes) { - // Item size = 4 bytes (cache ID) + 8 bytes (cache size) = 12 bytes - byte[] data = new byte[cacheSizes.size() * 12]; - long off = GridUnsafe.BYTE_ARR_OFF; - - for (Map.Entry entry : cacheSizes.entrySet()) { - GridUnsafe.putInt(data, off, entry.getKey()); off += 4; - GridUnsafe.putLong(data, off, entry.getValue()); off += 8; - } - - return data; - } - - /** - * @param map Map to add values to. - * @param pageAddr page address - * @param io Page Meta IO - * @param cacheId Cache ID. - * @param partition Partition ID. - * @param pages Number of pages to add. - */ - private static void addPartition( - Map, T2> map, - long pageAddr, - PagePartitionMetaIO io, - int cacheId, - int partition, - int pages - ) { - if (pages <= 1) - return; - - assert PageIO.getPageId(pageAddr) != 0; - - int lastAllocatedIdx = io.getLastPageCount(pageAddr); - map.put(new T2<>(cacheId, partition), new T2<>(lastAllocatedIdx, pages)); - } - - /** {@inheritDoc} */ - @Override protected void destroyCacheDataStore0(CacheDataStore store) throws IgniteCheckedException { - ctx.database().checkpointReadLock(); - - try { - int p = store.partId(); - - saveStoreMetadata(store, null, false, true); - - PageMemoryEx pageMemory = (PageMemoryEx)grp.memoryPolicy().pageMemory(); - - int tag = pageMemory.invalidate(grp.groupId(), p); - - ctx.wal().log(new PartitionDestroyRecord(grp.groupId(), p)); - - ctx.pageStore().onPartitionDestroyed(grp.groupId(), p, tag); - } - finally { - ctx.database().checkpointReadUnlock(); - } - } - - /** {@inheritDoc} */ - @Override public void onPartitionCounterUpdated(int part, long cntr) { - CacheDataStore store = partDataStores.get(part); - - assert store != null; - - long oldCnt = store.updateCounter(); - - if (oldCnt < cntr) - store.updateCounter(cntr); - } - - /** {@inheritDoc} */ - @Override public void onPartitionInitialCounterUpdated(int part, long cntr) { - CacheDataStore store = partDataStores.get(part); - - assert store != null; - - long oldCnt = store.initialUpdateCounter(); - - if (oldCnt < cntr) - store.updateInitialCounter(cntr); - } - - /** {@inheritDoc} */ - @Override public long lastUpdatedPartitionCounter(int part) { - return partDataStores.get(part).updateCounter(); - } - - /** {@inheritDoc} */ - @Override public RootPage rootPageForIndex(int cacheId, String idxName) throws IgniteCheckedException { - if (grp.sharedGroup()) - idxName = Integer.toString(cacheId) + "_" + idxName; - - return metaStore.getOrAllocateForTree(idxName); - } - - /** {@inheritDoc} */ - @Override public void dropRootPageForIndex(int cacheId, String idxName) throws IgniteCheckedException { - if (grp.sharedGroup()) - idxName = Integer.toString(cacheId) + "_" + idxName; - - metaStore.dropRootPage(idxName); - } - - /** {@inheritDoc} */ - @Override public ReuseList reuseListForIndex(String idxName) { - return reuseList; - } - - /** {@inheritDoc} */ - @Override public void stop() { - if (grp.affinityNode()) - ((GridCacheDatabaseSharedManager)ctx.database()).removeCheckpointListener(this); - } - - /** - * @return Meta root pages info. - * @throws IgniteCheckedException If failed. - */ - private Metas getOrAllocateCacheMetas() throws IgniteCheckedException { - PageMemoryEx pageMem = (PageMemoryEx)grp.memoryPolicy().pageMemory(); - IgniteWriteAheadLogManager wal = ctx.wal(); - - int grpId = grp.groupId(); - long metaId = pageMem.metaPageId(grpId); - long metaPage = pageMem.acquirePage(grpId, metaId); - - try { - final long pageAddr = pageMem.writeLock(grpId, metaId, metaPage); - - boolean allocated = false; - - try { - long metastoreRoot, reuseListRoot; - - if (PageIO.getType(pageAddr) != PageIO.T_META) { - PageMetaIO pageIO = PageMetaIO.VERSIONS.latest(); - - pageIO.initNewPage(pageAddr, metaId, pageMem.pageSize()); - - metastoreRoot = pageMem.allocatePage(grpId, PageIdAllocator.INDEX_PARTITION, PageMemory.FLAG_IDX); - reuseListRoot = pageMem.allocatePage(grpId, PageIdAllocator.INDEX_PARTITION, PageMemory.FLAG_IDX); - - pageIO.setTreeRoot(pageAddr, metastoreRoot); - pageIO.setReuseListRoot(pageAddr, reuseListRoot); - - if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, metaId, metaPage, wal, null)) - wal.log(new MetaPageInitRecord( - grpId, - metaId, - pageIO.getType(), - pageIO.getVersion(), - metastoreRoot, - reuseListRoot - )); - - allocated = true; - } - else { - PageMetaIO pageIO = PageIO.getPageIO(pageAddr); - - metastoreRoot = pageIO.getTreeRoot(pageAddr); - reuseListRoot = pageIO.getReuseListRoot(pageAddr); - - assert reuseListRoot != 0L; - } - - return new Metas( - new RootPage(new FullPageId(metastoreRoot, grpId), allocated), - new RootPage(new FullPageId(reuseListRoot, grpId), allocated)); - } - finally { - pageMem.writeUnlock(grpId, metaId, metaPage, null, allocated); - } - } - finally { - pageMem.releasePage(grpId, metaId, metaPage); - } - } - - /** {@inheritDoc} */ - @Override public IgniteRebalanceIterator rebalanceIterator(int part, AffinityTopologyVersion topVer, - Long partCntrSince) throws IgniteCheckedException { - if (partCntrSince == null) - return super.rebalanceIterator(part, topVer, partCntrSince); - - GridCacheDatabaseSharedManager database = (GridCacheDatabaseSharedManager)ctx.database(); - - try { - WALPointer startPtr = database.searchPartitionCounter(grp.groupId(), part, partCntrSince); - - if (startPtr == null) { - assert false : "partCntr=" + partCntrSince + ", reservations=" + S.toString(Map.class, database.reservedForPreloading()); - - return super.rebalanceIterator(part, topVer, partCntrSince); - } - - WALIterator it = ctx.wal().replay(startPtr); - - return new RebalanceIteratorAdapter(grp, it, part); - } - catch (IgniteCheckedException e) { - U.warn(log, "Failed to create WAL-based rebalance iterator (a full partition will transferred to a " + - "remote node) [part=" + part + ", partCntrSince=" + partCntrSince + ", err=" + e + ']'); - - return super.rebalanceIterator(part, topVer, partCntrSince); - } - } - - /** - * - */ - private static class RebalanceIteratorAdapter implements IgniteRebalanceIterator { - /** Serial version uid. */ - private static final long serialVersionUID = 0L; - - /** Cache group caches. */ - private final Set cacheGrpCaches; - - /** WAL iterator. */ - private final WALIterator walIt; - - /** Partition to scan. */ - private final int part; - - /** */ - private Iterator entryIt; - - /** */ - private CacheDataRow next; - - /** - * @param grp Cache group. - * @param walIt WAL iterator. - * @param part Partition ID. - */ - private RebalanceIteratorAdapter(CacheGroupContext grp, WALIterator walIt, int part) { - this.cacheGrpCaches = grp.cacheIds(); - this.walIt = walIt; - this.part = part; - - advance(); - } - - /** {@inheritDoc} */ - @Override public boolean historical() { - return true; - } - - /** {@inheritDoc} */ - @Override public void close() throws IgniteCheckedException { - walIt.close(); - } - - /** {@inheritDoc} */ - @Override public boolean isClosed() { - return walIt.isClosed(); - } - - /** {@inheritDoc} */ - @Override public boolean hasNextX() { - return hasNext(); - } - - /** {@inheritDoc} */ - @Override public CacheDataRow nextX() throws IgniteCheckedException { - return next(); - } - - /** {@inheritDoc} */ - @Override public void removeX() throws IgniteCheckedException { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public Iterator iterator() { - return this; - } - - /** {@inheritDoc} */ - @Override public boolean hasNext() { - return next != null; - } - - /** {@inheritDoc} */ - @Override public CacheDataRow next() { - if (next == null) - throw new NoSuchElementException(); - - CacheDataRow val = next; - - advance(); - - return val; - } - - /** {@inheritDoc} */ - @Override public void remove() { - throw new UnsupportedOperationException(); - } - - /** - * - */ - private void advance() { - next = null; - - while (true) { - if (entryIt != null) { - while (entryIt.hasNext()) { - DataEntry entry = entryIt.next(); - - if (entry.partitionId() == part && cacheGrpCaches.contains(entry.cacheId())) { - - next = new DataEntryRow(entry); - - return; - } - } - } - - entryIt = null; - - while (walIt.hasNext()) { - IgniteBiTuple rec = walIt.next(); - - if (rec.get2() instanceof DataRecord) { - DataRecord data = (DataRecord)rec.get2(); - - entryIt = data.writeEntries().iterator(); - // Move on to the next valid data entry. - - break; - } - } - - if (entryIt == null) - return; - } - } - } - - /** - * Data entry row. - */ - private static class DataEntryRow implements CacheDataRow { - /** */ - private final DataEntry entry; - - /** - * @param entry Data entry. - */ - private DataEntryRow(DataEntry entry) { - this.entry = entry; - } - - /** {@inheritDoc} */ - @Override public KeyCacheObject key() { - return entry.key(); - } - - /** {@inheritDoc} */ - @Override public void key(KeyCacheObject key) { - throw new IllegalStateException(); - } - - /** {@inheritDoc} */ - @Override public CacheObject value() { - return entry.value(); - } - - /** {@inheritDoc} */ - @Override public GridCacheVersion version() { - return entry.writeVersion(); - } - - /** {@inheritDoc} */ - @Override public long expireTime() { - return entry.expireTime(); - } - - /** {@inheritDoc} */ - @Override public int partition() { - return entry.partitionId(); - } - - /** {@inheritDoc} */ - @Override public long link() { - return 0; - } - - /** {@inheritDoc} */ - @Override public void link(long link) { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public int hash() { - return entry.key().hashCode(); - } - - /** {@inheritDoc} */ - @Override public int cacheId() { - return entry.cacheId(); - } - } - - /** - * - */ - private static class Metas { - /** */ - @GridToStringInclude - private final RootPage reuseListRoot; - - /** */ - @GridToStringInclude - private final RootPage treeRoot; - - /** - * @param treeRoot Metadata storage root. - * @param reuseListRoot Reuse list root. - */ - Metas(RootPage treeRoot, RootPage reuseListRoot) { - this.treeRoot = treeRoot; - this.reuseListRoot = reuseListRoot; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(Metas.class, this); - } - } - - /** - * - */ - private class GridCacheDataStore implements CacheDataStore { - /** */ - private final int partId; - - /** */ - private String name; - - /** */ - private volatile FreeListImpl freeList; - - /** */ - private volatile CacheDataStore delegate; - - /** */ - private final boolean exists; - - /** */ - private final AtomicBoolean init = new AtomicBoolean(); - - /** */ - private final CountDownLatch latch = new CountDownLatch(1); - - /** - * @param partId Partition. - * @param exists {@code True} if store for this index exists. - */ - private GridCacheDataStore(int partId, boolean exists) { - this.partId = partId; - this.exists = exists; - - name = treeName(partId); - } - - /** - * @return Store delegate. - * @throws IgniteCheckedException If failed. - */ - private CacheDataStore init0(boolean checkExists) throws IgniteCheckedException { - CacheDataStore delegate0 = delegate; - - if (delegate0 != null) - return delegate0; - - if (checkExists) { - if (!exists) - return null; - } - - IgniteCacheDatabaseSharedManager dbMgr = ctx.database(); - - dbMgr.checkpointReadLock(); - - if (init.compareAndSet(false, true)) { - try { - Metas metas = getOrAllocatePartitionMetas(); - - RootPage reuseRoot = metas.reuseListRoot; - - freeList = new FreeListImpl( - grp.groupId(), - grp.cacheOrGroupName() + "-" + partId, - grp.memoryPolicy().memoryMetrics(), - grp.memoryPolicy(), - null, - ctx.wal(), - reuseRoot.pageId().pageId(), - reuseRoot.isAllocated()) { - @Override protected long allocatePageNoReuse() throws IgniteCheckedException { - return pageMem.allocatePage(cacheId, partId, PageIdAllocator.FLAG_DATA); - } - }; - - CacheDataRowStore rowStore = new CacheDataRowStore(grp, freeList, partId); - - RootPage treeRoot = metas.treeRoot; - - CacheDataTree dataTree = new CacheDataTree( - grp, - name, - freeList, - rowStore, - treeRoot.pageId().pageId(), - treeRoot.isAllocated()) { - @Override protected long allocatePageNoReuse() throws IgniteCheckedException { - return pageMem.allocatePage(cacheId, partId, PageIdAllocator.FLAG_DATA); - } - }; - - PageMemoryEx pageMem = (PageMemoryEx)grp.memoryPolicy().pageMemory(); - - delegate0 = new CacheDataStoreImpl(partId, name, rowStore, dataTree); - - int grpId = grp.groupId(); - long partMetaId = pageMem.partitionMetaPageId(grpId, partId); - long partMetaPage = pageMem.acquirePage(grpId, partMetaId); - - try { - long pageAddr = pageMem.readLock(grpId, partMetaId, partMetaPage); - - try { - if (PageIO.getType(pageAddr) != 0) { - PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.latest(); - - Map cacheSizes = null; - - if (grp.sharedGroup()) { - long cntrsPageId = io.getCountersPageId(pageAddr); - - if (cntrsPageId != 0L) { - cacheSizes = new HashMap<>(); - - long nextId = cntrsPageId; - - while (true){ - final long curId = nextId; - final long curPage = pageMem.acquirePage(grpId, curId); - - try { - final long curAddr = pageMem.readLock(grpId, curId, curPage); - - assert curAddr != 0; - - try { - PagePartitionCountersIO cntrsIO = PageIO.getPageIO(curAddr); - - if (cntrsIO.readCacheSizes(curAddr, cacheSizes)) - break; - - nextId = cntrsIO.getNextCountersPageId(curAddr); - - assert nextId != 0; - } - finally { - pageMem.readUnlock(grpId, curId, curPage); - } - } - finally { - pageMem.releasePage(grpId, curId, curPage); - } - } - } - } - - delegate0.init(io.getSize(pageAddr), io.getUpdateCounter(pageAddr), cacheSizes); - - globalRemoveId().setIfGreater(io.getGlobalRemoveId(pageAddr)); - } - } - finally { - pageMem.readUnlock(grpId, partMetaId, partMetaPage); - } - } - finally { - pageMem.releasePage(grpId, partMetaId, partMetaPage); - } - - delegate = delegate0; - } - finally { - latch.countDown(); - - dbMgr.checkpointReadUnlock(); - } - } - else { - dbMgr.checkpointReadUnlock(); - - U.await(latch); - - delegate0 = delegate; - - if (delegate0 == null) - throw new IgniteCheckedException("Cache store initialization failed."); - } - - return delegate0; - } - - /** - * @return Partition metas. - */ - private Metas getOrAllocatePartitionMetas() throws IgniteCheckedException { - PageMemoryEx pageMem = (PageMemoryEx)grp.memoryPolicy().pageMemory(); - IgniteWriteAheadLogManager wal = ctx.wal(); - - int grpId = grp.groupId(); - long partMetaId = pageMem.partitionMetaPageId(grpId, partId); - long partMetaPage = pageMem.acquirePage(grpId, partMetaId); - try { - boolean allocated = false; - long pageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage); - - try { - long treeRoot, reuseListRoot; - - // Initialize new page. - if (PageIO.getType(pageAddr) != PageIO.T_PART_META) { - PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.latest(); - - io.initNewPage(pageAddr, partMetaId, pageMem.pageSize()); - - treeRoot = pageMem.allocatePage(grpId, partId, PageMemory.FLAG_DATA); - reuseListRoot = pageMem.allocatePage(grpId, partId, PageMemory.FLAG_DATA); - - assert PageIdUtils.flag(treeRoot) == PageMemory.FLAG_DATA; - assert PageIdUtils.flag(reuseListRoot) == PageMemory.FLAG_DATA; - - io.setTreeRoot(pageAddr, treeRoot); - io.setReuseListRoot(pageAddr, reuseListRoot); - - if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, partMetaId, partMetaPage, wal, null)) - wal.log(new MetaPageInitRecord( - grpId, - partMetaId, - io.getType(), - io.getVersion(), - treeRoot, - reuseListRoot - )); - - allocated = true; - } - else { - PagePartitionMetaIO io = PageIO.getPageIO(pageAddr); - - treeRoot = io.getTreeRoot(pageAddr); - reuseListRoot = io.getReuseListRoot(pageAddr); - - assert PageIdUtils.flag(treeRoot) == PageMemory.FLAG_DATA : - U.hexLong(treeRoot) + ", part=" + partId + ", grpId=" + grpId; - assert PageIdUtils.flag(reuseListRoot) == PageMemory.FLAG_DATA : - U.hexLong(reuseListRoot) + ", part=" + partId + ", grpId=" + grpId; - } - - return new Metas( - new RootPage(new FullPageId(treeRoot, grpId), allocated), - new RootPage(new FullPageId(reuseListRoot, grpId), allocated)); - } - finally { - pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, allocated); - } - } - finally { - pageMem.releasePage(grpId, partMetaId, partMetaPage); - } - } - - /** {@inheritDoc} */ - @Override public int partId() { - return partId; - } - - /** {@inheritDoc} */ - @Override public String name() { - return name; - } - - /** {@inheritDoc} */ - @Override public RowStore rowStore() { - CacheDataStore delegate0 = delegate; - - return delegate0 == null ? null : delegate0.rowStore(); - } - - /** {@inheritDoc} */ - @Override public int fullSize() { - try { - CacheDataStore delegate0 = init0(true); - - return delegate0 == null ? 0 : delegate0.fullSize(); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - } - - /** {@inheritDoc} */ - @Override public int cacheSize(int cacheId) { - try { - CacheDataStore delegate0 = init0(true); - - return delegate0 == null ? 0 : delegate0.cacheSize(cacheId); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - } - - /** {@inheritDoc} */ - @Override public Map cacheSizes() { - try { - CacheDataStore delegate0 = init0(true); - - return delegate0 == null ? null : delegate0.cacheSizes(); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - } - - /** {@inheritDoc} */ - @Override public long updateCounter() { - try { - CacheDataStore delegate0 = init0(true); - - return delegate0 == null ? 0 : delegate0.updateCounter(); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - } - - /** {@inheritDoc} */ - @Override public void init(long size, long updCntr, @Nullable Map cacheSizes) { - throw new IllegalStateException("Should be never called."); - } - - /** {@inheritDoc} */ - @Override public void updateCounter(long val) { - try { - CacheDataStore delegate0 = init0(false); - - if (delegate0 != null) - delegate0.updateCounter(val); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - } - - /** {@inheritDoc} */ - @Override public long nextUpdateCounter() { - try { - CacheDataStore delegate0 = init0(false); - - return delegate0 == null ? 0 : delegate0.nextUpdateCounter(); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - } - - /** {@inheritDoc} */ - @Override public Long initialUpdateCounter() { - try { - CacheDataStore delegate0 = init0(true); - - return delegate0 == null ? 0 : delegate0.initialUpdateCounter(); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - } - - /** {@inheritDoc} */ - @Override public void updateInitialCounter(long cntr) { - try { - CacheDataStore delegate0 = init0(true); - - if (delegate0 != null) - delegate0.updateInitialCounter(cntr); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - } - - /** {@inheritDoc} */ - @Override public void update( - GridCacheContext cctx, - KeyCacheObject key, - CacheObject val, - GridCacheVersion ver, - long expireTime, - @Nullable CacheDataRow oldRow - ) throws IgniteCheckedException { - CacheDataStore delegate = init0(false); - - delegate.update(cctx, key, val, ver, expireTime, oldRow); - } - - /** {@inheritDoc} */ - @Override public void updateIndexes(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException { - CacheDataStore delegate = init0(false); - - delegate.updateIndexes(cctx, key); - } - - /** {@inheritDoc} */ - @Override public CacheDataRow createRow( - GridCacheContext cctx, - KeyCacheObject key, - CacheObject val, - GridCacheVersion ver, - long expireTime, - @Nullable CacheDataRow oldRow) throws IgniteCheckedException { - CacheDataStore delegate = init0(false); - - return delegate.createRow(cctx, key, val, ver, expireTime, oldRow); - } - - /** {@inheritDoc} */ - @Override public void invoke(GridCacheContext cctx, KeyCacheObject key, OffheapInvokeClosure c) - throws IgniteCheckedException { - CacheDataStore delegate = init0(false); - - delegate.invoke(cctx, key, c); - } - - /** {@inheritDoc} */ - @Override public void remove(GridCacheContext cctx, KeyCacheObject key, int partId) - throws IgniteCheckedException { - CacheDataStore delegate = init0(false); - - delegate.remove(cctx, key, partId); - } - - /** {@inheritDoc} */ - @Override public CacheDataRow find(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException { - CacheDataStore delegate = init0(true); - - if (delegate != null) - return delegate.find(cctx, key); - - return null; - } - - /** {@inheritDoc} */ - @Override public GridCursor cursor() throws IgniteCheckedException { - CacheDataStore delegate = init0(true); - - if (delegate != null) - return delegate.cursor(); - - return EMPTY_CURSOR; - } - - /** {@inheritDoc} */ - @Override public GridCursor cursor( - int cacheId, - KeyCacheObject lower, - KeyCacheObject upper) throws IgniteCheckedException { - CacheDataStore delegate = init0(true); - - if (delegate != null) - return delegate.cursor(cacheId, lower, upper); - - return EMPTY_CURSOR; - } - - /** {@inheritDoc} */ - @Override public GridCursor cursor(int cacheId, - KeyCacheObject lower, - KeyCacheObject upper, - Object x) - throws IgniteCheckedException { - CacheDataStore delegate = init0(true); - - if (delegate != null) - return delegate.cursor(cacheId, lower, upper, x); - - return EMPTY_CURSOR; - } - - /** {@inheritDoc} */ - @Override public void destroy() throws IgniteCheckedException { - // No need to destroy delegate. - } - - /** {@inheritDoc} */ - @Override public GridCursor cursor(int cacheId) throws IgniteCheckedException { - CacheDataStore delegate = init0(true); - - if (delegate != null) - return delegate.cursor(cacheId); - - return EMPTY_CURSOR; - } - - /** {@inheritDoc} */ - @Override public void clear(int cacheId) throws IgniteCheckedException { - CacheDataStore delegate = init0(true); - - if (delegate != null) - delegate.clear(cacheId); - } - } - - /** - * - */ - private static final GridCursor EMPTY_CURSOR = new GridCursor() { - /** {@inheritDoc} */ - @Override public boolean next() { - return false; - } - - /** {@inheritDoc} */ - @Override public CacheDataRow get() { - return null; - } - }; -} http://git-wip-us.apache.org/repos/asf/ignite/blob/6bf5ce46/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/PersistenceMetricsImpl.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/PersistenceMetricsImpl.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/PersistenceMetricsImpl.java deleted file mode 100644 index 25d95ae..0000000 --- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/PersistenceMetricsImpl.java +++ /dev/null @@ -1,297 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ignite.internal.processors.cache.database; - -import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; -import org.apache.ignite.internal.processors.cache.ratemetrics.HitRateMetrics; -import org.apache.ignite.mxbean.PersistenceMetricsMXBean; - -/** - * - */ -public class PersistenceMetricsImpl implements PersistenceMetricsMXBean { - /** */ - private volatile HitRateMetrics walLoggingRate; - - /** */ - private volatile HitRateMetrics walWritingRate; - - /** */ - private volatile HitRateMetrics walFsyncTimeDuration; - - /** */ - private volatile HitRateMetrics walFsyncTimeNumber; - - /** */ - private volatile long lastCpLockWaitDuration; - - /** */ - private volatile long lastCpMarkDuration; - - /** */ - private volatile long lastCpPagesWriteDuration; - - /** */ - private volatile long lastCpDuration; - - /** */ - private volatile long lastCpFsyncDuration; - - /** */ - private volatile long lastCpTotalPages; - - /** */ - private volatile long lastCpDataPages; - - /** */ - private volatile long lastCpCowPages; - - /** */ - private volatile long rateTimeInterval; - - /** */ - private volatile int subInts; - - /** */ - private volatile boolean metricsEnabled; - - /** */ - private IgniteWriteAheadLogManager wal; - - /** - * @param metricsEnabled Metrics enabled flag. - * @param rateTimeInterval Rate time interval. - * @param subInts Number of sub-intervals. - */ - public PersistenceMetricsImpl( - boolean metricsEnabled, - long rateTimeInterval, - int subInts - ) { - this.metricsEnabled = metricsEnabled; - this.rateTimeInterval = rateTimeInterval; - this.subInts = subInts; - - resetRates(); - } - - /** {@inheritDoc} */ - @Override public float getWalLoggingRate() { - if (!metricsEnabled) - return 0; - - return ((float)walLoggingRate.getRate()) / rateTimeInterval; - } - - /** {@inheritDoc} */ - @Override public float getWalWritingRate() { - if (!metricsEnabled) - return 0; - - return ((float)walWritingRate.getRate()) / rateTimeInterval; - } - - /** {@inheritDoc} */ - @Override public int getWalArchiveSegments() { - if (!metricsEnabled) - return 0; - - return wal.walArchiveSegments(); - } - - /** {@inheritDoc} */ - @Override public float getWalFsyncTimeAverage() { - if (!metricsEnabled) - return 0; - - long numRate = walFsyncTimeNumber.getRate(); - - if (numRate == 0) - return 0; - - return (float)walFsyncTimeDuration.getRate() / numRate; - } - - /** {@inheritDoc} */ - @Override public long getLastCheckpointingDuration() { - if (!metricsEnabled) - return 0; - - return lastCpDuration; - } - - /** {@inheritDoc} */ - @Override public long getLastCheckpointLockWaitDuration() { - if (!metricsEnabled) - return 0; - - return lastCpLockWaitDuration; - } - - /** {@inheritDoc} */ - @Override public long getLastCheckpointMarkDuration() { - if (!metricsEnabled) - return 0; - - return lastCpMarkDuration; - } - - /** {@inheritDoc} */ - @Override public long getLastCheckpointPagesWriteDuration() { - if (!metricsEnabled) - return 0; - - return lastCpPagesWriteDuration; - } - - /** {@inheritDoc} */ - @Override public long getLastCheckpointFsyncDuration() { - if (!metricsEnabled) - return 0; - - return lastCpFsyncDuration; - } - - /** {@inheritDoc} */ - @Override public long getLastCheckpointTotalPagesNumber() { - if (!metricsEnabled) - return 0; - - return lastCpTotalPages; - } - - /** {@inheritDoc} */ - @Override public long getLastCheckpointDataPagesNumber() { - if (!metricsEnabled) - return 0; - - return lastCpDataPages; - } - - /** {@inheritDoc} */ - @Override public long getLastCheckpointCopiedOnWritePagesNumber() { - if (!metricsEnabled) - return 0; - - return lastCpCowPages; - } - - /** {@inheritDoc} */ - @Override public void enableMetrics() { - metricsEnabled = true; - } - - /** {@inheritDoc} */ - @Override public void disableMetrics() { - metricsEnabled = false; - } - - /** {@inheritDoc} */ - @Override public void rateTimeInterval(long rateTimeInterval) { - this.rateTimeInterval = rateTimeInterval; - - resetRates(); - } - - /** {@inheritDoc} */ - @Override public void subIntervals(int subInts) { - this.subInts = subInts; - - resetRates(); - } - - /** - * @param wal Write-ahead log manager. - */ - public void wal(IgniteWriteAheadLogManager wal) { - this.wal = wal; - } - - /** - * @return Metrics enabled flag. - */ - public boolean metricsEnabled() { - return metricsEnabled; - } - - /** - * @param lockWaitDuration Lock wait duration. - * @param markDuration Mark duration. - * @param pagesWriteDuration Pages write duration. - * @param fsyncDuration Total checkpoint fsync duration. - * @param duration Total checkpoint duration. - * @param totalPages Total number of all pages in checkpoint. - * @param dataPages Total number of data pages in checkpoint. - * @param cowPages Total number of COW-ed pages in checkpoint. - */ - public void onCheckpoint( - long lockWaitDuration, - long markDuration, - long pagesWriteDuration, - long fsyncDuration, - long duration, - long totalPages, - long dataPages, - long cowPages - ) { - if (metricsEnabled) { - lastCpLockWaitDuration = lockWaitDuration; - lastCpMarkDuration = markDuration; - lastCpPagesWriteDuration = pagesWriteDuration; - lastCpFsyncDuration = fsyncDuration; - lastCpDuration = duration; - lastCpTotalPages = totalPages; - lastCpDataPages = dataPages; - lastCpCowPages = cowPages; - } - } - - /** - * - */ - public void onWalRecordLogged() { - walLoggingRate.onHit(); - } - - /** - * @param size Size written. - */ - public void onWalBytesWritten(int size) { - walWritingRate.onHits(size); - } - - /** - * @param nanoTime Fsync nano time. - */ - public void onFsync(long nanoTime) { - long microseconds = nanoTime / 1_000; - - walFsyncTimeDuration.onHits(microseconds); - walFsyncTimeNumber.onHit(); - } - - /** - * - */ - private void resetRates() { - walLoggingRate = new HitRateMetrics((int)rateTimeInterval, subInts); - walWritingRate = new HitRateMetrics((int)rateTimeInterval, subInts); - - walFsyncTimeDuration = new HitRateMetrics((int)rateTimeInterval, subInts); - walFsyncTimeNumber = new HitRateMetrics((int)rateTimeInterval, subInts); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/6bf5ce46/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/file/FilePageStore.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/file/FilePageStore.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/file/FilePageStore.java deleted file mode 100755 index 2042358..0000000 --- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/file/FilePageStore.java +++ /dev/null @@ -1,529 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.database.file; - -import java.io.File; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.nio.channels.FileChannel; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; -import org.apache.ignite.IgniteSystemProperties; -import org.apache.ignite.configuration.MemoryConfiguration; -import org.apache.ignite.internal.pagemem.PageIdUtils; -import org.apache.ignite.internal.pagemem.store.PageStore; -import org.apache.ignite.internal.processors.cache.database.tree.io.PageIO; -import org.apache.ignite.internal.processors.cache.database.wal.crc.IgniteDataIntegrityViolationException; -import org.apache.ignite.internal.processors.cache.database.wal.crc.PureJavaCrc32; -import org.apache.ignite.internal.util.typedef.internal.U; - -import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC; - -/** - * File page store. - */ -public class FilePageStore implements PageStore { - /** Page store file signature. */ - private static final long SIGNATURE = 0xF19AC4FE60C530B8L; - - /** File version. */ - private static final int VERSION = 1; - - /** Allocated field offset. */ - public static final int HEADER_SIZE = 8/*SIGNATURE*/ + 4/*VERSION*/ + 1/*type*/ + 4/*page size*/; - - /** */ - private final File cfgFile; - - /** */ - private final byte type; - - /** Database configuration. */ - private final MemoryConfiguration dbCfg; - - /** */ - private RandomAccessFile file; - - /** */ - private FileChannel ch; - - /** */ - private final AtomicLong allocated; - - /** */ - private final int pageSize; - - /** */ - private volatile boolean inited; - - /** */ - private volatile boolean recover; - - /** */ - private volatile int tag; - - /** */ - private boolean skipCrc = IgniteSystemProperties.getBoolean(IGNITE_PDS_SKIP_CRC, false); - - /** */ - private final ReadWriteLock lock = new ReentrantReadWriteLock(); - - /** - * @param file File. - */ - public FilePageStore(byte type, File file, MemoryConfiguration cfg) { - this.type = type; - - cfgFile = file; - dbCfg = cfg; - - allocated = new AtomicLong(); - - pageSize = dbCfg.getPageSize(); - } - - /** {@inheritDoc} */ - @Override public boolean exists() { - return cfgFile.exists() && cfgFile.length() > HEADER_SIZE; - } - - /** - * @param type Type. - * @param pageSize Page size. - * @return Byte buffer instance. - */ - public static ByteBuffer header(byte type, int pageSize) { - ByteBuffer hdr = ByteBuffer.allocate(HEADER_SIZE).order(ByteOrder.LITTLE_ENDIAN); - - hdr.putLong(SIGNATURE); - - hdr.putInt(VERSION); - - hdr.put(type); - - hdr.putInt(pageSize); - - hdr.rewind(); - - return hdr; - } - - /** - * - */ - private long initFile() { - try { - ByteBuffer hdr = header(type, dbCfg.getPageSize()); - - while (hdr.remaining() > 0) - ch.write(hdr); - } - catch (IOException e) { - throw new IgniteException("Check file failed.", e); - } - - //there is 'super' page in every file - return HEADER_SIZE + dbCfg.getPageSize(); - } - - /** - * - */ - private long checkFile() throws IgniteCheckedException { - try { - ByteBuffer hdr = ByteBuffer.allocate(HEADER_SIZE).order(ByteOrder.LITTLE_ENDIAN); - - while (hdr.remaining() > 0) - ch.read(hdr); - - hdr.rewind(); - - long signature = hdr.getLong(); - - if (SIGNATURE != signature) - throw new IgniteCheckedException("Failed to verify store file (invalid file signature)" + - " [expectedSignature=" + U.hexLong(SIGNATURE) + - ", actualSignature=" + U.hexLong(signature) + ']'); - - int ver = hdr.getInt(); - - if (VERSION != ver) - throw new IgniteCheckedException("Failed to verify store file (invalid file version)" + - " [expectedVersion=" + VERSION + - ", fileVersion=" + ver + "]"); - - byte type = hdr.get(); - - if (this.type != type) - throw new IgniteCheckedException("Failed to verify store file (invalid file type)" + - " [expectedFileType=" + this.type + - ", actualFileType=" + type + "]"); - - int pageSize = hdr.getInt(); - - if (dbCfg.getPageSize() != pageSize) - throw new IgniteCheckedException("Failed to verify store file (invalid page size)" + - " [expectedPageSize=" + dbCfg.getPageSize() + - ", filePageSize=" + pageSize + "]"); - - long fileSize = file.length(); - - if (fileSize == HEADER_SIZE) // Every file has a special meta page. - fileSize = pageSize + HEADER_SIZE; - - if ((fileSize - HEADER_SIZE) % pageSize != 0) - throw new IgniteCheckedException("Failed to verify store file (invalid file size)" + - " [fileSize=" + U.hexLong(fileSize) + - ", pageSize=" + U.hexLong(pageSize) + ']'); - - return fileSize; - } - catch (IOException e) { - throw new IgniteCheckedException("File check failed", e); - } - } - - /** - * @param cleanFile {@code True} to delete file. - * @throws IgniteCheckedException If failed. - */ - public void stop(boolean cleanFile) throws IgniteCheckedException { - lock.writeLock().lock(); - - try { - if (!inited) - return; - - ch.force(false); - - file.close(); - - if (cleanFile) - cfgFile.delete(); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - finally { - lock.writeLock().unlock(); - } - } - - /** - * - */ - public void truncate(int tag) throws IgniteCheckedException { - lock.writeLock().lock(); - - try { - if (!inited) - return; - - this.tag = tag; - - ch.position(0); - - file.setLength(0); - - allocated.set(initFile()); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - finally { - lock.writeLock().unlock(); - } - } - - /** - * - */ - public void beginRecover() { - lock.writeLock().lock(); - - try { - recover = true; - } - finally { - lock.writeLock().unlock(); - } - } - - /** - * - */ - public void finishRecover() { - lock.writeLock().lock(); - - try { - if (inited) - allocated.set(ch.size()); - - recover = false; - } - catch (IOException e) { - throw new RuntimeException(e); - } - finally { - lock.writeLock().unlock(); - } - } - - /** {@inheritDoc} */ - @Override public void read(long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteCheckedException { - init(); - - try { - long off = pageOffset(pageId); - - assert pageBuf.capacity() == pageSize; - assert pageBuf.position() == 0; - assert pageBuf.order() == ByteOrder.nativeOrder(); - - int len = pageSize; - - do { - int n = ch.read(pageBuf, off); - - // If page was not written yet, nothing to read. - if (n < 0) { - pageBuf.put(new byte[pageBuf.remaining()]); - - return; - } - - off += n; - - len -= n; - } - while (len > 0); - - int savedCrc32 = PageIO.getCrc(pageBuf); - - PageIO.setCrc(pageBuf, 0); - - pageBuf.position(0); - - if (!skipCrc) { - int curCrc32 = PureJavaCrc32.calcCrc32(pageBuf, pageSize); - - if ((savedCrc32 ^ curCrc32) != 0) - throw new IgniteDataIntegrityViolationException("Failed to read page (CRC validation failed) " + - "[id=" + U.hexLong(pageId) + ", off=" + (off - pageSize) + - ", file=" + cfgFile.getAbsolutePath() + ", fileSize=" + ch.size() + - ", savedCrc=" + U.hexInt(savedCrc32) + ", curCrc=" + U.hexInt(curCrc32) + "]"); - } - - assert PageIO.getCrc(pageBuf) == 0; - - if (keepCrc) - PageIO.setCrc(pageBuf, savedCrc32); - } - catch (IOException e) { - throw new IgniteCheckedException("Read error", e); - } - } - - /** {@inheritDoc} */ - @Override public void readHeader(ByteBuffer buf) throws IgniteCheckedException { - init(); - - try { - assert buf.remaining() == HEADER_SIZE; - - int len = HEADER_SIZE; - - long off = 0; - - do { - int n = ch.read(buf, off); - - // If page was not written yet, nothing to read. - if (n < 0) - return; - - off += n; - - len -= n; - } - while (len > 0); - } - catch (IOException e) { - throw new IgniteCheckedException("Read error", e); - } - } - - /** - * @throws IgniteCheckedException If failed to initialize store file. - */ - private void init() throws IgniteCheckedException { - if (!inited) { - lock.writeLock().lock(); - - try { - if (!inited) { - RandomAccessFile rndFile = null; - - IgniteCheckedException err = null; - - try { - file = rndFile = new RandomAccessFile(cfgFile, "rw"); - - ch = file.getChannel(); - - if (file.length() == 0) - allocated.set(initFile()); - else - allocated.set(checkFile()); - - inited = true; - } - catch (IOException e) { - throw err = new IgniteCheckedException("Can't open file: " + cfgFile.getName(), e); - } - finally { - if (err != null && rndFile != null) - try { - rndFile.close(); - } - catch (IOException e) { - err.addSuppressed(e); - } - } - } - } - finally { - lock.writeLock().unlock(); - } - } - } - - /** {@inheritDoc} */ - @Override public void write(long pageId, ByteBuffer pageBuf, int tag) throws IgniteCheckedException { - init(); - - lock.readLock().lock(); - - try { - if (tag < this.tag) - return; - - long off = pageOffset(pageId); - - assert (off >= 0 && off + pageSize <= allocated.get() + HEADER_SIZE) || recover : - "off=" + U.hexLong(off) + ", allocated=" + U.hexLong(allocated.get()) + ", pageId=" + U.hexLong(pageId); - - assert pageBuf.capacity() == pageSize; - assert pageBuf.position() == 0; - assert pageBuf.order() == ByteOrder.nativeOrder(); - assert PageIO.getCrc(pageBuf) == 0 : U.hexLong(pageId); - - int crc32 = skipCrc ? 0 : PureJavaCrc32.calcCrc32(pageBuf, pageSize); - - PageIO.setCrc(pageBuf, crc32); - - pageBuf.position(0); - - int len = pageSize; - - do { - int n = ch.write(pageBuf, off); - - off += n; - - len -= n; - } - while (len > 0); - - PageIO.setCrc(pageBuf, 0); - } - catch (IOException e) { - throw new IgniteCheckedException("Failed to write the page to the file store [pageId=" + pageId + - ", file=" + cfgFile.getAbsolutePath() + ']', e); - } - finally { - lock.readLock().unlock(); - } - } - - /** {@inheritDoc} */ - @Override public long pageOffset(long pageId) { - return (long) PageIdUtils.pageIndex(pageId) * pageSize + HEADER_SIZE; - } - - /** {@inheritDoc} */ - @Override public void sync() throws IgniteCheckedException { - lock.writeLock().lock(); - - try { - init(); - - ch.force(false); - } - catch (IOException e) { - throw new IgniteCheckedException("Sync error", e); - } - finally { - lock.writeLock().unlock(); - } - } - - /** {@inheritDoc} */ - @Override public synchronized void ensure() throws IgniteCheckedException { - init(); - } - - /** {@inheritDoc} */ - @Override public long allocatePage() throws IgniteCheckedException { - init(); - - long off = allocPage(); - - return off / pageSize; - } - - /** - * - */ - private long allocPage() { - long off; - - do { - off = allocated.get(); - - if (allocated.compareAndSet(off, off + pageSize)) - break; - } - while (true); - - return off; - } - - /** {@inheritDoc} */ - @Override public int pages() { - if (!inited) - return 0; - - return (int)(allocated.get() / pageSize); - } -}