Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0A4A5200CC4 for ; Thu, 13 Jul 2017 12:20:43 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 08F0416BD3E; Thu, 13 Jul 2017 10:20:43 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 091EF16BD3D for ; Thu, 13 Jul 2017 12:20:40 +0200 (CEST) Received: (qmail 70691 invoked by uid 500); 13 Jul 2017 10:20:38 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 70567 invoked by uid 99); 13 Jul 2017 10:20:38 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Jul 2017 10:20:38 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 183A5F54E0; Thu, 13 Jul 2017 10:20:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Thu, 13 Jul 2017 10:20:43 -0000 Message-Id: <884493b9fcad48338fe8f1359d64d30f@git.apache.org> In-Reply-To: <6e920b89663243e1b6def36c1371f467@git.apache.org> References: <6e920b89663243e1b6def36c1371f467@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [08/10] ignite git commit: Optimized snapshot progress tracking archived-at: Thu, 13 Jul 2017 10:20:43 -0000 Optimized snapshot progress tracking Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/21964fb5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/21964fb5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/21964fb5 Branch: refs/heads/ignite-2.1 Commit: 21964fb5f6fb6fee891283332202cbc9ed5ac3f3 Parents: 3787181 Author: Dmitry Pavlov Authored: Wed Jul 12 18:59:10 2017 +0300 Committer: Alexey Goncharuk Committed: Wed Jul 12 18:59:10 2017 +0300 ---------------------------------------------------------------------- .../ignite/internal/pagemem/FullPageId.java | 6 +- .../ignite/internal/pagemem/PageIdUtils.java | 14 +- .../pagemem/store/IgnitePageStoreManager.java | 1 + .../internal/pagemem/store/PageStore.java | 2 + .../delta/MetaPageUpdateLastAllocatedIndex.java | 2 +- .../cache/persistence/DbCheckpointListener.java | 7 +- .../FullPageIdIterableComparator.java | 51 ------- .../GridCacheDatabaseSharedManager.java | 63 +++++--- .../persistence/GridCacheOffheapManager.java | 56 +++---- .../cache/persistence/file/FilePageStore.java | 2 +- .../persistence/file/FilePageStoreManager.java | 1 + .../cache/persistence/pagemem/PageMemoryEx.java | 8 +- .../persistence/pagemem/PageMemoryImpl.java | 10 +- .../persistence/partstate/GroupPartitionId.java | 145 +++++++++++++++++++ .../partstate/PagesAllocationRange.java | 68 +++++++++ .../partstate/PartitionAllocationMap.java | 113 +++++++++++++++ .../snapshot/IgniteCacheSnapshotManager.java | 17 ++- .../cache/persistence/tree/io/PageMetaIO.java | 27 ++-- .../persistence/tree/io/TrackingPageIO.java | 8 +- .../persistence/tree/util/PageHandler.java | 3 +- 20 files changed, 466 insertions(+), 138 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/pagemem/FullPageId.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/FullPageId.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/FullPageId.java index 00f52c1..9e24943 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/FullPageId.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/FullPageId.java @@ -21,7 +21,7 @@ import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.U; /** - * Compound object used to address a page in the global page space. + * Compound object used to address a page in the global page space. *

Page ID structure

*

* Generally, a full page ID consists of a cache ID and page ID. A page ID consists of @@ -49,13 +49,13 @@ import org.apache.ignite.internal.util.typedef.internal.U; * Effective page ID is page ID with zeroed bits used for page ID rotation. */ public class FullPageId { - /** */ + /** Page ID. */ private final long pageId; /** */ private final long effectivePageId; - /** */ + /** Cache group ID. */ private final int grpId; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdUtils.java index 92f427a..6f4ba93 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdUtils.java @@ -49,10 +49,10 @@ public final class PageIdUtils { /** */ public static final long TAG_MASK = ~(-1L << TAG_SIZE); - /** */ + /** Page Index is a monotonically growing number within each partition */ public static final long PART_ID_MASK = ~(-1L << PART_ID_SIZE); - /** */ + /** Flags mask. Flags consists from a number of reserved bits, and page type (data/index page) */ public static final long FLAG_MASK = ~(-1L << FLAG_SIZE); /** */ @@ -92,10 +92,10 @@ public final class PageIdUtils { } /** - * Extracts a page index from the given pageId. + * Extracts a page index from the given page ID. * - * @param pageId Page id. - * @return Page ID. + * @param pageId Page ID. + * @return Page index. */ public static int pageIndex(long pageId) { return (int)(pageId & PAGE_IDX_MASK); // 4 bytes @@ -150,7 +150,9 @@ public final class PageIdUtils { /** * @param partId Partition ID. - * @return Part ID constructed from the given cache ID and partition ID. + * @param flag Flags (a number of reserved bits, and page type (data/index page)) + * @param pageIdx Page index, monotonically growing number within each partition + * @return Page ID constructed from the given pageIdx and partition ID, see {@link FullPageId} */ public static long pageId(int partId, byte flag, int pageIdx) { long pageId = flag & FLAG_MASK; http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java index a1b766f..eaa85ad 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java @@ -77,6 +77,7 @@ public interface IgnitePageStoreManager extends GridCacheSharedManager, IgniteCh * * @param grpId Cache group ID of the evicted partition. * @param partId Partition ID. + * @param tag Partition tag (growing 1-based partition file version). * @throws IgniteCheckedException If failed to handle partition destroy callback. */ public void onPartitionDestroyed(int grpId, int partId, int tag) throws IgniteCheckedException; http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java index be83704..4698a6b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java @@ -70,6 +70,8 @@ public interface PageStore { * * @param pageId Page ID. * @param pageBuf Page buffer to write. + * @param tag Partition file version, 1-based incrementing counter. For outdated pages {@code tag} has lower value, + * and write does nothing * @throws IgniteCheckedException If page writing failed (IO error occurred). */ public void write(long pageId, ByteBuffer pageBuf, int tag) throws IgniteCheckedException; http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdateLastAllocatedIndex.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdateLastAllocatedIndex.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdateLastAllocatedIndex.java index 60aebde..11b2a67 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdateLastAllocatedIndex.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdateLastAllocatedIndex.java @@ -44,7 +44,7 @@ public class MetaPageUpdateLastAllocatedIndex extends PageDeltaRecord { PageMetaIO io = PageMetaIO.VERSIONS.forVersion(PageIO.getVersion(pageAddr)); - io.setLastPageCount(pageAddr, lastAllocatedIdx); + io.setLastAllocatedPageCount(pageAddr, lastAllocatedIdx); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DbCheckpointListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DbCheckpointListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DbCheckpointListener.java index 0b28b6a..1c438b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DbCheckpointListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DbCheckpointListener.java @@ -17,9 +17,8 @@ package org.apache.ignite.internal.processors.cache.persistence; -import java.util.Map; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionAllocationMap; /** * @@ -35,9 +34,9 @@ public interface DbCheckpointListener { public boolean nextSnapshot(); /** - * + * @return Partition allocation statistic map */ - public Map, T2> partitionStatMap(); + public PartitionAllocationMap partitionStatMap(); /** * @param cacheOrGrpName Cache or group name. http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/FullPageIdIterableComparator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/FullPageIdIterableComparator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/FullPageIdIterableComparator.java deleted file mode 100644 index c056c52..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/FullPageIdIterableComparator.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.persistence; - -import java.io.Serializable; -import java.util.Comparator; -import org.apache.ignite.internal.util.typedef.T2; - -/** - * - */ -public class FullPageIdIterableComparator implements Comparator>, Serializable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - public static final FullPageIdIterableComparator INSTANCE = new FullPageIdIterableComparator(); - - /** {@inheritDoc} */ - @Override public int compare(T2 o1, T2 o2) { - if (o1.get1() < o2.get1()) - return -1; - - if (o1.get1() > o2.get1()) - return 1; - - if (o1.get2() < o2.get2()) - return -1; - - if (o1.get2() > o2.get2()) - return 1; - - return 0; - } -} - http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index 5136731..9f2067a 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -40,7 +40,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableMap; -import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -52,6 +51,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Matcher; @@ -109,6 +109,7 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStor import org.apache.ignite.internal.processors.cache.persistence.pagemem.CheckpointMetricsTracker; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl; +import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionAllocationMap; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager; import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotOperation; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; @@ -263,8 +264,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** */ private boolean stopping; - /** Checkpoint runner thread pool. */ - private ExecutorService asyncRunner; + /** Checkpoint runner thread pool. If null tasks are to be run in single thread */ + @Nullable private ExecutorService asyncRunner; /** Buffer for the checkpoint threads. */ private ThreadLocal threadBuf; @@ -1916,6 +1917,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan asyncRunner == null ? 1 : chp.cpPages.collectionsSize()); tracker.onPagesWriteStart(); + final AtomicInteger writtenPagesCtr = new AtomicInteger(); + final int totalPagesToWriteCnt = chp.cpPages.size(); if (asyncRunner != null) { for (int i = 0; i < chp.cpPages.collectionsSize(); i++) { @@ -1923,7 +1926,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan tracker, chp.cpPages.innerCollection(i), updStores, - doneWriteFut + doneWriteFut, + writtenPagesCtr, + totalPagesToWriteCnt ); try { @@ -1937,7 +1942,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } else { // Single-threaded checkpoint. - Runnable write = new WriteCheckpointPages(tracker, chp.cpPages, updStores, doneWriteFut); + Runnable write = new WriteCheckpointPages(tracker, + chp.cpPages, + updStores, + doneWriteFut, + writtenPagesCtr, + totalPagesToWriteCnt); write.run(); } @@ -2092,15 +2102,15 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan curCpProgress = curr; } - final NavigableMap, T2> map = - new TreeMap<>(FullPageIdIterableComparator.INSTANCE); + final PartitionAllocationMap map = new PartitionAllocationMap(); DbCheckpointListener.Context ctx0 = new DbCheckpointListener.Context() { @Override public boolean nextSnapshot() { return curr.nextSnapshot; } - @Override public Map, T2> partitionStatMap() { + /** {@inheritDoc} */ + @Override public PartitionAllocationMap partitionStatMap() { return map; } @@ -2278,14 +2288,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } } - /** - * - */ + /** Pages write task */ private class WriteCheckpointPages implements Runnable { /** */ private CheckpointMetricsTracker tracker; - /** */ + /** Collection of page IDs to write under this task. Overall pages to write may be greater than this collection*/ private Collection writePageIds; /** */ @@ -2294,19 +2302,34 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** */ private CountDownFuture doneFut; + /** Counter for all written pages. May be shared between several workers */ + private AtomicInteger writtenPagesCntr; + + /** Total pages to write, counter may be greater than {@link #writePageIds} size*/ + private final int totalPagesToWrite; + /** - * @param writePageIds Write page IDs. + * Creates task for write pages + * @param tracker + * @param writePageIds Collection of page IDs to write. + * @param updStores + * @param doneFut + * @param writtenPagesCntr all written pages counter, may be shared between several write tasks + * @param totalPagesToWrite total pages to be written under this checkpoint */ private WriteCheckpointPages( - CheckpointMetricsTracker tracker, - Collection writePageIds, - GridConcurrentHashSet updStores, - CountDownFuture doneFut - ) { + final CheckpointMetricsTracker tracker, + final Collection writePageIds, + final GridConcurrentHashSet updStores, + final CountDownFuture doneFut, + @NotNull final AtomicInteger writtenPagesCntr, + final int totalPagesToWrite) { this.tracker = tracker; this.writePageIds = writePageIds; this.updStores = updStores; this.doneFut = doneFut; + this.writtenPagesCntr = writtenPagesCntr; + this.totalPagesToWrite = totalPagesToWrite; } /** {@inheritDoc} */ @@ -2354,7 +2377,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan tmpWriteBuf.rewind(); } - snapshotMgr.onPageWrite(fullId, tmpWriteBuf); + int curWrittenPages = writtenPagesCntr.incrementAndGet(); + + snapshotMgr.onPageWrite(fullId, tmpWriteBuf, curWrittenPages, totalPagesToWrite); tmpWriteBuf.rewind(); http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java index bd902fb..6e6b7df 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java @@ -52,6 +52,9 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeListImpl; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx; +import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId; +import org.apache.ignite.internal.processors.cache.persistence.partstate.PagesAllocationRange; +import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionAllocationMap; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageMetaIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionCountersIO; @@ -63,7 +66,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.lang.GridCursor; import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; @@ -207,9 +209,9 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple long partMetaPage = pageMem.acquirePage(grpId, partMetaId); try { - long pageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage); + long partMetaPageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage); - if (pageAddr == 0L) { + if (partMetaPageAddr == 0L) { U.warn(log, "Failed to acquire write lock for meta page [metaPage=" + partMetaPage + ", saveMeta=" + saveMeta + ", beforeDestroy=" + beforeDestroy + ", size=" + size + ", updCntr=" + updCntr + ", state=" + state + ']'); @@ -220,21 +222,21 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple boolean changed = false; try { - PagePartitionMetaIO io = PageIO.getPageIO(pageAddr); + PagePartitionMetaIO io = PageIO.getPageIO(partMetaPageAddr); - changed |= io.setUpdateCounter(pageAddr, updCntr); - changed |= io.setGlobalRemoveId(pageAddr, rmvId); - changed |= io.setSize(pageAddr, size); + changed |= io.setUpdateCounter(partMetaPageAddr, updCntr); + changed |= io.setGlobalRemoveId(partMetaPageAddr, rmvId); + changed |= io.setSize(partMetaPageAddr, size); if (state != null) - changed |= io.setPartitionState(pageAddr, (byte)state.ordinal()); + changed |= io.setPartitionState(partMetaPageAddr, (byte)state.ordinal()); else assert grp.isLocal() : grp.cacheOrGroupName(); long cntrsPageId; if (grp.sharedGroup()) { - cntrsPageId = io.getCountersPageId(pageAddr); + cntrsPageId = io.getCountersPageId(partMetaPageAddr); byte[] data = serializeCacheSizes(store.cacheSizes()); @@ -247,7 +249,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple if (init && items > 0) { cntrsPageId = pageMem.allocatePage(grpId, store.partId(), PageIdAllocator.FLAG_DATA); - io.setCountersPageId(pageAddr, cntrsPageId); + io.setCountersPageId(partMetaPageAddr, cntrsPageId); changed = true; } @@ -301,7 +303,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple if (needSnapshot) { pageCnt = this.ctx.pageStore().pages(grpId, store.partId()); - io.setCandidatePageCount(pageAddr, pageCnt); + io.setCandidatePageCount(partMetaPageAddr, pageCnt); if (saveMeta) { long metaPageId = pageMem.metaPageId(grpId); @@ -345,13 +347,13 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple if (partMap.containsKey(store.partId()) && partMap.get(store.partId()) == GridDhtPartitionState.OWNING) - addPartition(ctx.partitionStatMap(), pageAddr, io, grpId, store.partId(), + addPartition(ctx.partitionStatMap(), partMetaPageAddr, io, grpId, store.partId(), this.ctx.pageStore().pages(grpId, store.partId())); changed = true; } else - pageCnt = io.getCandidatePageCount(pageAddr); + pageCnt = io.getCandidatePageCount(partMetaPageAddr); if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, partMetaId, partMetaPage, wal, null)) wal.log(new MetaPageUpdatePartitionDataRecord( @@ -397,27 +399,29 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple /** * @param map Map to add values to. - * @param pageAddr page address + * @param metaPageAddr Meta page address * @param io Page Meta IO * @param cacheId Cache ID. - * @param partition Partition ID. - * @param pages Number of pages to add. + * @param partId Partition ID. Or {@link PageIdAllocator#INDEX_PARTITION} for index partition + * @param currAllocatedPageCnt total number of pages allocated for partition [partition, cacheId] */ private static void addPartition( - Map, T2> map, - long pageAddr, - PageMetaIO io, - int cacheId, - int partition, - int pages + final PartitionAllocationMap map, + final long metaPageAddr, + final PageMetaIO io, + final int cacheId, + final int partId, + final int currAllocatedPageCnt ) { - if (pages <= 1) + if (currAllocatedPageCnt <= 1) return; - assert PageIO.getPageId(pageAddr) != 0; + assert PageIO.getPageId(metaPageAddr) != 0; - int lastAllocatedIdx = io.getLastPageCount(pageAddr); - map.put(new T2<>(cacheId, partition), new T2<>(lastAllocatedIdx, pages)); + int lastAllocatedPageCnt = io.getLastAllocatedPageCount(metaPageAddr); + map.put( + new GroupPartitionId(cacheId, partId), + new PagesAllocationRange(lastAllocatedPageCnt, currAllocatedPageCnt)); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java index c827e96..a7ca13c 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java @@ -77,7 +77,7 @@ public class FilePageStore implements PageStore { /** */ private volatile boolean recover; - /** */ + /** Partition file version, 1-based incrementing counter. For outdated pages tag has low value, and write does nothing */ private volatile int tag; /** */ http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java index af20136..e2ad070 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java @@ -318,6 +318,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen * @param cacheId Cache ID to write. * @param pageId Page ID. * @param pageBuf Page buffer. + * @param tag Partition tag (growing 1-based partition file version). Used to validate page is not outdated * @return PageStore to which the page has been written. * @throws IgniteCheckedException If IO error occurred. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java index 7c63d41..53e21b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java @@ -115,19 +115,19 @@ public interface PageMemoryEx extends PageMemory { * * @param pageId Page ID to get byte buffer for. The page ID must be present in the collection returned by * the {@link #beginCheckpoint()} method call. - * @param tmpBuf Temporary buffer to write changes into. + * @param outBuf Temporary buffer to write changes into. * @param tracker Checkpoint metrics tracker. - * @return {@code True} if data were read, {@code false} otherwise (data already saved to storage). + * @return {@code Partition tag} if data was read, {@code null} otherwise (data already saved to storage). * @throws IgniteException If failed to obtain page data. */ - @Nullable public Integer getForCheckpoint(FullPageId pageId, ByteBuffer tmpBuf, CheckpointMetricsTracker tracker); + @Nullable public Integer getForCheckpoint(FullPageId pageId, ByteBuffer outBuf, CheckpointMetricsTracker tracker); /** * Marks partition as invalid / outdated. * * @param cacheId Cache ID. * @param partId Partition ID. - * @return New partition tag. + * @return New partition tag (growing 1-based partition file version). */ public int invalidate(int cacheId, int partId); http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java index 47381d7..1b4cf81 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java @@ -792,8 +792,8 @@ public class PageMemoryImpl implements PageMemoryEx { } /** {@inheritDoc} */ - @Override public Integer getForCheckpoint(FullPageId fullId, ByteBuffer tmpBuf, CheckpointMetricsTracker tracker) { - assert tmpBuf.remaining() == pageSize(); + @Override public Integer getForCheckpoint(FullPageId fullId, ByteBuffer outBuf, CheckpointMetricsTracker tracker) { + assert outBuf.remaining() == pageSize(); Segment seg = segment(fullId.groupId(), fullId.pageId()); @@ -876,7 +876,7 @@ public class PageMemoryImpl implements PageMemoryEx { } } else - return copyPageForCheckpoint(absPtr, fullId, tmpBuf, tmpBuffer, tracker) ? tag : null; + return copyPageForCheckpoint(absPtr, fullId, outBuf, tmpBuffer, tracker) ? tag : null; } /** @@ -1565,7 +1565,7 @@ public class PageMemoryImpl implements PageMemoryEx { /** */ private final int maxDirtyPages; - /** */ + /** Maps partition (cacheId, partId) to its tag. Tag is 1-based incrementing partition file counter */ private final Map, Integer> partTagMap = new HashMap<>(); /** @@ -1903,7 +1903,7 @@ public class PageMemoryImpl implements PageMemoryEx { /** * @param grpId Cache group ID. * @param partId Partition ID. - * @return Partition tag. + * @return Partition tag. Growing 1 based partition file version */ private int partTag(int grpId, int partId) { assert getReadHoldCount() > 0 || getWriteHoldCount() > 0; http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/GroupPartitionId.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/GroupPartitionId.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/GroupPartitionId.java new file mode 100644 index 0000000..dbdf670 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/GroupPartitionId.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.partstate; + +import org.apache.ignite.internal.pagemem.FullPageId; +import org.apache.ignite.internal.pagemem.PageIdAllocator; +import org.apache.ignite.internal.pagemem.PageIdUtils; +import org.apache.ignite.internal.pagemem.PageMemory; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.NotNull; + +/** + * Pair of cache group ID with partition ID. Immutable, comparable class, may be used as key in maps + */ +public class GroupPartitionId implements Comparable { + /** Index for super(meta) page. There is always such page for iterated cache partition */ + private static final int METAPAGE_IDX = 0; + + /** Cache group ID. */ + private final int grpId; + + /** Partition ID. */ + private final int partId; + + /** + * Creates group-partition tuple. + * + * @param grpId Group ID. + * @param partId Partition ID. + */ + public GroupPartitionId(final int grpId, final int partId) { + this.grpId = grpId; + this.partId = partId; + } + + /** + * @param partId Partition ID. + * @return flag to be used for partition + */ + private static byte getFlagByPartId(final int partId) { + return partId == PageIdAllocator.INDEX_PARTITION ? PageMemory.FLAG_IDX : PageMemory.FLAG_DATA; + } + + /** + * @return cache ID + */ + public int getGroupId() { + return grpId; + } + + /** + * @return Partition ID + */ + public int getPartitionId() { + return partId; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GroupPartitionId.class, this); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + GroupPartitionId key = (GroupPartitionId)o; + + if (grpId != key.grpId) + return false; + + return partId == key.partId; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int result = grpId; + + result = 31 * result + partId; + + return result; + } + + /** {@inheritDoc} */ + @Override public int compareTo(@NotNull GroupPartitionId o) { + if (getGroupId() < o.getGroupId()) + return -1; + + if (getGroupId() > o.getGroupId()) + return 1; + + if (getPartitionId() < o.getPartitionId()) + return -1; + + if (getPartitionId() > o.getPartitionId()) + return 1; + return 0; + } + + /** + * @param pageIdx Page Index, monotonically growing number within each partition + * @return page ID (64 bits) constructed from partition ID and given index + */ + private long createPageId(final int pageIdx) { + final int partId = getPartitionId(); + + return PageIdUtils.pageId(partId, getFlagByPartId(partId), pageIdx); + } + + /** + * Returns Full page ID. For index 0 will return super-page of next partition + * + * @param pageIdx Page Index, monotonically growing number within each partition + * @return FullPageId consists of cache ID (32 bits) and page ID (64 bits). + */ + @NotNull private FullPageId createFullPageId(final int pageIdx) { + return new FullPageId(createPageId(pageIdx), getGroupId()); + } + + /** + * @return will return super-page (metapage) of this partition + */ + @NotNull public FullPageId createFirstPageFullId() { + return createFullPageId(METAPAGE_IDX); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/PagesAllocationRange.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/PagesAllocationRange.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/PagesAllocationRange.java new file mode 100644 index 0000000..e7170c3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/PagesAllocationRange.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.partstate; + +/** + * Range of pages allocated. + * Contains currently allocated page count and previously observed page count. + * May be used for tracking history of recent allocation for partition [partition, cacheId] + */ +public class PagesAllocationRange { + /** + * Previously observed total number of allocated pages. May be stored using PageMetaIO. + * Used to separate newly allocated pages with previously observed state + * Minimum value is 0. Can't be greater than {@link #currAllocatedPageCnt} + */ + private final int lastAllocatedPageCnt; + + /** Total current number of pages allocated, minimum value is 0. */ + private final int currAllocatedPageCnt; + + /** + * Creates pages range + * + * @param lastAllocatedPageCnt Last allocated pages count. + * @param currAllocatedPageCnt Currently allocated pages count. + */ + public PagesAllocationRange(final int lastAllocatedPageCnt, final int currAllocatedPageCnt) { + this.lastAllocatedPageCnt = lastAllocatedPageCnt; + this.currAllocatedPageCnt = currAllocatedPageCnt; + } + + /** + * @return Total current number of pages allocated, minimum value is 0. + */ + public int getCurrAllocatedPageCnt() { + return currAllocatedPageCnt; + } + + /** + * @return Previously observed total number of allocated pages. + */ + public int getLastAllocatedPageCnt() { + return lastAllocatedPageCnt; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "PagesAllocationRange{" + + "lastAllocatedPageCnt=" + lastAllocatedPageCnt + + ", currAllocatedPageCnt=" + currAllocatedPageCnt + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/PartitionAllocationMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/PartitionAllocationMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/PartitionAllocationMap.java new file mode 100644 index 0000000..9ed4000 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/PartitionAllocationMap.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.partstate; + +import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; +import java.util.TreeMap; +import org.apache.ignite.internal.pagemem.FullPageId; +import org.apache.ignite.internal.pagemem.PageIdUtils; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Information structure with partitions state. + * Page counts map. + */ +public class PartitionAllocationMap { + /** Maps following pairs: (groupId, partId) -> (lastAllocatedCount, allocatedCount) */ + private final NavigableMap map = new TreeMap<>(); + + /** + * Returns the value to which the specified key is mapped, + * or {@code null} if this map contains no mapping for the key. + * + * @param key to get + * @return value or null + */ + @Nullable public PagesAllocationRange get(GroupPartitionId key) { + return map.get(key); + } + + /** + * Extracts partition information from full page ID + * + * @param fullId page related to some cache + * @return pair of cache ID and partition ID + */ + @NotNull public static GroupPartitionId createCachePartId(@NotNull final FullPageId fullId) { + return new GroupPartitionId(fullId.groupId(), PageIdUtils.partId(fullId.pageId())); + } + + /** @return true if this map contains no key-value mappings */ + public boolean isEmpty() { + return map.isEmpty(); + } + + /** @return the number of key-value mappings in this map. */ + public int size() { + return map.size(); + } + + /** @return keys (all caches partitions) */ + public Set keySet() { + return map.keySet(); + } + + /** @return values (allocation ranges) */ + public Iterable values() { + return map.values(); + } + + /** @return Returns the first (lowest) key currently in this map. */ + public GroupPartitionId firstKey() { + return map.firstKey(); + } + + /** + * Returns next (higher) key for provided cache and partition or null + * + * @param key cache and partition to search + * @return first found key which is greater than provided + */ + @Nullable public GroupPartitionId nextKey(@NotNull final GroupPartitionId key) { + return map.navigableKeySet().higher(key); + } + + /** @return set view of the mappings contained in this map, sorted in ascending key order */ + public Set> entrySet() { + return map.entrySet(); + } + + /** @return true if this map contains a mapping for the specified key */ + public boolean containsKey(GroupPartitionId key) { + return map.containsKey(key); + } + + /** + * @param key key with which the specified value is to be associated + * @param val value to be associated with the specified key + * @return the previous value associated with key, or null if there was no mapping for + * key. + */ + public PagesAllocationRange put(GroupPartitionId key, PagesAllocationRange val) { + return map.put(key, val); + } + +} http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheSnapshotManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheSnapshotManager.java index 0a27bcd..50e6515 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheSnapshotManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheSnapshotManager.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.cache.persistence.snapshot; import java.nio.ByteBuffer; -import java.util.NavigableMap; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.events.DiscoveryEvent; @@ -29,8 +28,8 @@ import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; +import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionAllocationMap; import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport; -import org.apache.ignite.internal.util.typedef.T2; import org.jetbrains.annotations.Nullable; /** @@ -64,12 +63,13 @@ public class IgniteCacheSnapshotManager extends Gri /** * @param snapshotOperation current snapshot operation. + * @param map (cacheId, partId) -> (lastAllocatedIndex, count) * * @return {@code true} if next operation must be snapshot, {@code false} if checkpoint must be executed. */ public boolean onMarkCheckPointBegin( T snapshotOperation, - NavigableMap, T2> map + PartitionAllocationMap map ) throws IgniteCheckedException { return false; } @@ -107,9 +107,16 @@ public class IgniteCacheSnapshotManager extends Gri } /** - * @param fullId Full id. + * @param fullId Full page id. + * @param tmpWriteBuf buffer + * @param writtenPages Overall pages written, negative value means there is no progress tracked + * @param totalPages Overall pages count to be written, should be positive */ - public void onPageWrite(FullPageId fullId, ByteBuffer tmpWriteBuf) { + public void onPageWrite( + final FullPageId fullId, + final ByteBuffer tmpWriteBuf, + final int writtenPages, + final int totalPages) { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageMetaIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageMetaIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageMetaIO.java index ac482e8..becd3e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageMetaIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageMetaIO.java @@ -43,10 +43,10 @@ public class PageMetaIO extends PageIO { /** Last successful full snapshot tag offset. */ private static final int LAST_SUCCESSFUL_FULL_SNAPSHOT_TAG_OFF = NEXT_SNAPSHOT_TAG_OFF + 8; - /** Last allocated index offset. */ + /** Last allocated pages count offset. */ private static final int LAST_PAGE_COUNT_OFF = LAST_SUCCESSFUL_FULL_SNAPSHOT_TAG_OFF + 8; - /** Candidate allocated index offset. */ + /** Candidate allocated page count offset. */ private static final int CANDIDATE_PAGE_COUNT_OFF = LAST_PAGE_COUNT_OFF + 4; /** End of page meta. */ @@ -82,7 +82,7 @@ public class PageMetaIO extends PageIO { setLastSuccessfulSnapshotId(pageAddr, 0); setNextSnapshotTag(pageAddr, 1); setLastSuccessfulSnapshotTag(pageAddr, 0); - setLastPageCount(pageAddr, 0); + setLastAllocatedPageCount(pageAddr, 0); setCandidatePageCount(pageAddr, 0); } @@ -179,24 +179,31 @@ public class PageMetaIO extends PageIO { } /** - * @param pageAddr Page address. - * @param pageCnt Last allocated index. + * Sets last allocated pages count, used to save and observe previous allocated count + * + * @param pageAddr Meta Page address. + * @param pageCnt Last allocated pages count to set */ - public void setLastPageCount(long pageAddr, int pageCnt) { + public void setLastAllocatedPageCount(final long pageAddr, final int pageCnt) { PageUtils.putInt(pageAddr, LAST_PAGE_COUNT_OFF, pageCnt); } /** - * @param buf Buffer. + * Gets last allocated pages count from given buffer + * + * @param buf Buffer to read data from. */ - public int getLastPageCount(@NotNull ByteBuffer buf) { + public int getLastAllocatedPageCount(@NotNull final ByteBuffer buf) { return buf.getInt(LAST_PAGE_COUNT_OFF); } /** - * @param pageAddr Page address. + * Gets last allocated pages count by provided address + * + * @param pageAddr Meta page address. + * @return Last allocated page count */ - public int getLastPageCount(long pageAddr) { + public int getLastAllocatedPageCount(final long pageAddr) { return PageUtils.getInt(pageAddr, LAST_PAGE_COUNT_OFF); } http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/TrackingPageIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/TrackingPageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/TrackingPageIO.java index 2263130..2051778 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/TrackingPageIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/TrackingPageIO.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.persistence.tree.io; import java.nio.ByteBuffer; import org.apache.ignite.internal.pagemem.PageIdUtils; import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler; +import org.jetbrains.annotations.Nullable; /** * We use dedicated page for tracking pages updates. @@ -182,6 +183,7 @@ public class TrackingPageIO extends PageIO { * @param buf Buffer. * @param pageId Page id. * @param curSnapshotTag Snapshot tag. + * @param lastSuccessfulSnapshotTag Last successful snapshot id. * @param pageSize Page size. */ public boolean wasChanged(ByteBuffer buf, long pageId, long curSnapshotTag, long lastSuccessfulSnapshotTag, int pageSize) { @@ -265,10 +267,12 @@ public class TrackingPageIO extends PageIO { * @param buf Buffer. * @param start Start. * @param curSnapshotTag Snapshot id. + * @param lastSuccessfulSnapshotTag Last successful snapshot id. * @param pageSize Page size. - * @return set pageId if it was changed or next closest one, if there is no changed page null will be returned + * @return set pageId if it was changed or next closest one, if there is no changed page {@code null} will be returned */ - public Long findNextChangedPage(ByteBuffer buf, long start, long curSnapshotTag, long lastSuccessfulSnapshotTag, int pageSize) { + @Nullable public Long findNextChangedPage(ByteBuffer buf, long start, long curSnapshotTag, + long lastSuccessfulSnapshotTag, int pageSize) { validateSnapshotId(buf, curSnapshotTag + 1, lastSuccessfulSnapshotTag, pageSize); int cntOfPage = countOfPageToTrack(pageSize); http://git-wip-us.apache.org/repos/asf/ignite/blob/21964fb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java index a87525a..3316980 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.persistence.tree.util; import java.nio.ByteBuffer; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.pagemem.PageMemory; +import org.apache.ignite.internal.pagemem.PageSupport; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.pagemem.wal.record.delta.InitNewPageRecord; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; @@ -440,7 +441,7 @@ public abstract class PageHandler { * @return {@code true} If we need to make a delta WAL record for the change in this page. */ public static boolean isWalDeltaRecordNeeded( - PageMemory pageMem, + PageSupport pageMem, int cacheId, long pageId, long page,