From commits-return-117202-archive-asf-public=cust-asf.ponee.io@ignite.apache.org Tue Mar 13 17:52:25 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 2D5EF18064F for ; Tue, 13 Mar 2018 17:52:23 +0100 (CET) Received: (qmail 46501 invoked by uid 500); 13 Mar 2018 16:52:22 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 46491 invoked by uid 99); 13 Mar 2018 16:52:22 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Mar 2018 16:52:22 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 213F2E96F2; Tue, 13 Mar 2018 16:52:22 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: agoncharuk@apache.org To: commits@ignite.apache.org Message-Id: <4f058e0c2dd44f178eeed47f8ec224fc@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: IGNITE-7751 Use throttling to protect from checkpoint buffer overflow - Fixes #3611. Date: Tue, 13 Mar 2018 16:52:22 +0000 (UTC) Repository: ignite Updated Branches: refs/heads/master 8da7c9e1b -> 05d58bb6e IGNITE-7751 Use throttling to protect from checkpoint buffer overflow - Fixes #3611. Signed-off-by: Alexey Goncharuk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/05d58bb6 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/05d58bb6 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/05d58bb6 Branch: refs/heads/master Commit: 05d58bb6efe35bfdd8bdb3e74dfd2627f6df7f9c Parents: 8da7c9e Author: Ivan Rakov Authored: Tue Mar 13 19:49:12 2018 +0300 Committer: Alexey Goncharuk Committed: Tue Mar 13 19:49:12 2018 +0300 ---------------------------------------------------------------------- .../internal/pagemem/PageIdAllocator.java | 4 +- .../pagemem/impl/PageMemoryNoStoreImpl.java | 2 +- .../GridCacheDatabaseSharedManager.java | 39 ++-- .../persistence/pagemem/PageMemoryImpl.java | 169 +++++++-------- .../pagemem/PagesWriteSpeedBasedThrottle.java | 16 +- .../persistence/pagemem/PagesWriteThrottle.java | 42 ++-- .../pagemem/BPlusTreePageMemoryImplTest.java | 2 +- .../BPlusTreeReuseListPageMemoryImplTest.java | 3 +- ...gnitePageMemReplaceDelayedWriteUnitTest.java | 5 +- .../pagemem/IgniteThrottlingUnitTest.java | 1 - .../pagemem/IndexStoragePageMemoryImplTest.java | 3 +- .../pagemem/PageMemoryImplNoLoadTest.java | 3 +- .../persistence/pagemem/PageMemoryImplTest.java | 211 ++++++++++++++++++- 13 files changed, 363 insertions(+), 137 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/05d58bb6/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java index ae7da14..c6aeabe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java @@ -38,11 +38,11 @@ public interface PageIdAllocator { /** * Allocates a page from the space for the given partition ID and the given flags. * - * @param cacheId Cache Group ID. + * @param grpId Cache Group ID. * @param partId Partition ID. * @return Allocated page ID. */ - public long allocatePage(int cacheId, int partId, byte flags) throws IgniteCheckedException; + public long allocatePage(int grpId, int partId, byte flags) throws IgniteCheckedException; /** * The given page is free now. http://git-wip-us.apache.org/repos/asf/ignite/blob/05d58bb6/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java index af1555e..7424af6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java @@ -254,7 +254,7 @@ public class PageMemoryNoStoreImpl implements PageMemory { } /** {@inheritDoc} */ - @Override public long allocatePage(int cacheId, int partId, byte flags) { + @Override public long allocatePage(int grpId, int partId, byte flags) { memMetrics.incrementTotalAllocatedPages(); long relPtr = borrowFreePage(); http://git-wip-us.apache.org/repos/asf/ignite/blob/05d58bb6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index d6a8a30..997f89a 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -193,6 +193,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan private final int walRebalanceThreshold = IgniteSystemProperties.getInteger( IGNITE_PDS_WAL_REBALANCE_THRESHOLD, 500_000); + /** Value of property for throttling policy override. */ + private final String throttlingPolicyOverride = IgniteSystemProperties.getString( + IgniteSystemProperties.IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED); + /** Checkpoint lock hold count. */ private static final ThreadLocal CHECKPOINT_LOCK_HOLD_COUNT = new ThreadLocal() { @Override protected Integer initialValue() { @@ -932,19 +936,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan chpBufSize = cacheSize; } - PageMemoryImpl.ThrottlingPolicy plc = persistenceCfg.isWriteThrottlingEnabled() - ? PageMemoryImpl.ThrottlingPolicy.SPEED_BASED - : PageMemoryImpl.ThrottlingPolicy.NONE; - - String val = IgniteSystemProperties.getString(IgniteSystemProperties.IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED); - - if (val != null) { - if ("ratio".equalsIgnoreCase(val)) - plc = PageMemoryImpl.ThrottlingPolicy.TARGET_RATIO_BASED; - else if ("speed".equalsIgnoreCase(val) || Boolean.valueOf(val)) - plc = PageMemoryImpl.ThrottlingPolicy.SPEED_BASED; - } - GridInClosure3X changeTracker; if (trackable) @@ -985,7 +976,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan changeTracker, this, memMetrics, - plc, + resolveThrottlingPolicy(), this ); @@ -994,6 +985,26 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan return pageMem; } + /** + * Resolves throttling policy according to the settings. + */ + @NotNull private PageMemoryImpl.ThrottlingPolicy resolveThrottlingPolicy() { + PageMemoryImpl.ThrottlingPolicy plc = persistenceCfg.isWriteThrottlingEnabled() + ? PageMemoryImpl.ThrottlingPolicy.SPEED_BASED + : PageMemoryImpl.ThrottlingPolicy.CHECKPOINT_BUFFER_ONLY; + + if (throttlingPolicyOverride != null) { + try { + plc = PageMemoryImpl.ThrottlingPolicy.valueOf(throttlingPolicyOverride.toUpperCase()); + } + catch (IllegalArgumentException e) { + log.error("Incorrect value of IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED property: " + + throttlingPolicyOverride + ". Default throttling policy " + plc + " will be used."); + } + } + return plc; + } + /** {@inheritDoc} */ @Override protected void checkRegionEvictionProperties(DataRegionConfiguration regCfg, DataStorageConfiguration dbCfg) throws IgniteCheckedException { http://git-wip-us.apache.org/repos/asf/ignite/blob/05d58bb6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java index fa10a1f..9f979f5 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java @@ -80,6 +80,7 @@ import org.apache.ignite.internal.util.lang.GridInClosure3X; import org.apache.ignite.internal.util.offheap.GridOffHeapOutOfMemoryException; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import static java.lang.Boolean.FALSE; @@ -147,7 +148,7 @@ public class PageMemoryImpl implements PageMemoryEx { /** Page ID offset */ public static final int PAGE_ID_OFFSET = 16; - /** Page cache ID offset. */ + /** Page cache group ID offset. */ public static final int PAGE_CACHE_ID_OFFSET = 24; /** Page pin counter offset. */ @@ -163,7 +164,7 @@ public class PageMemoryImpl implements PageMemoryEx { * 8b Marker/timestamp * 8b Relative pointer * 8b Page ID - * 4b Cache ID + * 4b Cache group ID * 4b Pin count * 8b Lock * 8b Temporary buffer @@ -176,6 +177,10 @@ public class PageMemoryImpl implements PageMemoryEx { /** Tracking io. */ private static final TrackingPageIO trackingIO = TrackingPageIO.VERSIONS.latest(); + /** Checkpoint pool overflow error message. */ + public static final String CHECKPOINT_POOL_OVERFLOW_ERROR_MSG = "Failed to allocate temporary buffer for checkpoint " + + "(increase checkpointPageBufferSize configuration property)"; + /** Page size. */ private final int sysPageSize; @@ -277,7 +282,7 @@ public class PageMemoryImpl implements PageMemoryEx { CheckpointLockStateChecker stateChecker, DataRegionMetricsImpl memMetrics, @Nullable ThrottlingPolicy throttlingPlc, - @Nullable CheckpointWriteProgressSupplier cpProgressProvider + @NotNull CheckpointWriteProgressSupplier cpProgressProvider ) { assert ctx != null; assert pageSize > 0; @@ -294,7 +299,7 @@ public class PageMemoryImpl implements PageMemoryEx { null; this.changeTracker = changeTracker; this.stateChecker = stateChecker; - this.throttlingPlc = throttlingPlc != null ? throttlingPlc : ThrottlingPolicy.NONE; + this.throttlingPlc = throttlingPlc != null ? throttlingPlc : ThrottlingPolicy.CHECKPOINT_BUFFER_ONLY; this.cpProgressProvider = cpProgressProvider; storeMgr = ctx.pageStore(); @@ -363,22 +368,15 @@ public class PageMemoryImpl implements PageMemoryEx { } /** - * + * Resolves instance of {@link PagesWriteThrottlePolicy} according to chosen throttle policy. */ private void initWriteThrottle() { - if (!isThrottlingEnabled()) - return; - - if (cpProgressProvider == null) { - log.error("Write throttle can't start. CP progress provider not presented"); - - throttlingPlc = ThrottlingPolicy.NONE; - } - if (throttlingPlc == ThrottlingPolicy.SPEED_BASED) writeThrottle = new PagesWriteSpeedBasedThrottle(this, cpProgressProvider, stateChecker, log); - else if(throttlingPlc == ThrottlingPolicy.TARGET_RATIO_BASED) - writeThrottle = new PagesWriteThrottle(this, cpProgressProvider, stateChecker); + else if (throttlingPlc == ThrottlingPolicy.TARGET_RATIO_BASED) + writeThrottle = new PagesWriteThrottle(this, cpProgressProvider, stateChecker, false); + else if (throttlingPlc == ThrottlingPolicy.CHECKPOINT_BUFFER_ONLY) + writeThrottle = new PagesWriteThrottle(this, null, stateChecker, true); } /** {@inheritDoc} */ @@ -398,8 +396,8 @@ public class PageMemoryImpl implements PageMemoryEx { } /** {@inheritDoc} */ - @Override public void releasePage(int cacheId, long pageId, long page) { - Segment seg = segment(cacheId, pageId); + @Override public void releasePage(int grpId, long pageId, long page) { + Segment seg = segment(grpId, pageId); seg.readLock().lock(); @@ -412,18 +410,18 @@ public class PageMemoryImpl implements PageMemoryEx { } /** {@inheritDoc} */ - @Override public long readLock(int cacheId, long pageId, long page) { - return readLockPage(page, new FullPageId(pageId, cacheId), false); + @Override public long readLock(int grpId, long pageId, long page) { + return readLockPage(page, new FullPageId(pageId, grpId), false); } /** {@inheritDoc} */ - @Override public void readUnlock(int cacheId, long pageId, long page) { + @Override public void readUnlock(int grpId, long pageId, long page) { readUnlockPage(page); } /** {@inheritDoc} */ - @Override public long writeLock(int cacheId, long pageId, long page) { - return writeLock(cacheId, pageId, page, false); + @Override public long writeLock(int grpId, long pageId, long page) { + return writeLock(grpId, pageId, page, false); } /** {@inheritDoc} */ @@ -432,14 +430,14 @@ public class PageMemoryImpl implements PageMemoryEx { } /** {@inheritDoc} */ - @Override public long tryWriteLock(int cacheId, long pageId, long page) { - return tryWriteLockPage(page, new FullPageId(pageId, cacheId), true); + @Override public long tryWriteLock(int grpId, long pageId, long page) { + return tryWriteLockPage(page, new FullPageId(pageId, grpId), true); } /** {@inheritDoc} */ - @Override public void writeUnlock(int cacheId, long pageId, long page, Boolean walPlc, + @Override public void writeUnlock(int grpId, long pageId, long page, Boolean walPlc, boolean dirtyFlag) { - writeUnlock(cacheId, pageId, page, walPlc, dirtyFlag, false); + writeUnlock(grpId, pageId, page, walPlc, dirtyFlag, false); } /** {@inheritDoc} */ @@ -449,12 +447,12 @@ public class PageMemoryImpl implements PageMemoryEx { } /** {@inheritDoc} */ - @Override public boolean isDirty(int cacheId, long pageId, long page) { + @Override public boolean isDirty(int grpId, long pageId, long page) { return isDirty(page); } /** {@inheritDoc} */ - @Override public long allocatePage(int cacheId, int partId, byte flags) throws IgniteCheckedException { + @Override public long allocatePage(int grpId, int partId, byte flags) throws IgniteCheckedException { assert flags == PageIdAllocator.FLAG_DATA && partId <= PageIdAllocator.MAX_PARTITION_ID || flags == PageIdAllocator.FLAG_IDX && partId == PageIdAllocator.INDEX_PARTITION : "flags = " + flags + ", partId = " + partId; @@ -464,19 +462,19 @@ public class PageMemoryImpl implements PageMemoryEx { if (isThrottlingEnabled()) writeThrottle.onMarkDirty(false); - long pageId = storeMgr.allocatePage(cacheId, partId, flags); + long pageId = storeMgr.allocatePage(grpId, partId, flags); assert PageIdUtils.pageIndex(pageId) > 0; //it's crucial for tracking pages (zero page is super one) // We need to allocate page in memory for marking it dirty to save it in the next checkpoint. // Otherwise it is possible that on file will be empty page which will be saved at snapshot and read with error // because there is no crc inside them. - Segment seg = segment(cacheId, pageId); + Segment seg = segment(grpId, pageId); DelayedDirtyPageWrite delayedWriter = delayedPageReplacementTracker != null ? delayedPageReplacementTracker.delayedPageWrite() : null; - FullPageId fullId = new FullPageId(pageId, cacheId); + FullPageId fullId = new FullPageId(pageId, grpId); seg.writeLock().lock(); @@ -484,15 +482,15 @@ public class PageMemoryImpl implements PageMemoryEx { try { long relPtr = seg.loadedPages.get( - cacheId, + grpId, PageIdUtils.effectivePageId(pageId), - seg.partGeneration(cacheId, partId), + seg.partGeneration(grpId, partId), INVALID_REL_PTR, OUTDATED_REL_PTR ); if (relPtr == OUTDATED_REL_PTR) - relPtr = refreshOutdatedPage(seg, cacheId, pageId, false); + relPtr = refreshOutdatedPage(seg, grpId, pageId, false); if (relPtr == INVALID_REL_PTR) relPtr = seg.borrowOrAllocateFreePage(pageId); @@ -528,7 +526,7 @@ public class PageMemoryImpl implements PageMemoryEx { if (!ctx.wal().isAlwaysWriteFullPages()) ctx.wal().log( new InitNewPageRecord( - cacheId, + grpId, pageId, trackingIO.getType(), trackingIO.getVersion(), pageId @@ -539,7 +537,7 @@ public class PageMemoryImpl implements PageMemoryEx { } } - seg.loadedPages.put(cacheId, PageIdUtils.effectivePageId(pageId), relPtr, seg.partGeneration(cacheId, partId)); + seg.loadedPages.put(grpId, PageIdUtils.effectivePageId(pageId), relPtr, seg.partGeneration(grpId, partId)); } catch (IgniteOutOfMemoryException oom) { DataRegionConfiguration dataRegionCfg = getDataRegionConfiguration(); @@ -562,7 +560,7 @@ public class PageMemoryImpl implements PageMemoryEx { } //we have allocated 'tracking' page, we need to allocate regular one - return isTrackingPage ? allocatePage(cacheId, partId, flags) : pageId; + return isTrackingPage ? allocatePage(grpId, partId, flags) : pageId; } /** @@ -596,7 +594,7 @@ public class PageMemoryImpl implements PageMemoryEx { } /** {@inheritDoc} */ - @Override public boolean freePage(int cacheId, long pageId) throws IgniteCheckedException { + @Override public boolean freePage(int grpId, long pageId) throws IgniteCheckedException { assert false : "Free page should be never called directly when persistence is enabled."; return false; @@ -613,25 +611,25 @@ public class PageMemoryImpl implements PageMemoryEx { } /** {@inheritDoc} */ - @Override public long acquirePage(int cacheId, long pageId) throws IgniteCheckedException { - return acquirePage(cacheId, pageId, false); + @Override public long acquirePage(int grpId, long pageId) throws IgniteCheckedException { + return acquirePage(grpId, pageId, false); } /** {@inheritDoc} */ - @Override public long acquirePage(int cacheId, long pageId, boolean restore) throws IgniteCheckedException { - FullPageId fullId = new FullPageId(pageId, cacheId); + @Override public long acquirePage(int grpId, long pageId, boolean restore) throws IgniteCheckedException { + FullPageId fullId = new FullPageId(pageId, grpId); int partId = PageIdUtils.partId(pageId); - Segment seg = segment(cacheId, pageId); + Segment seg = segment(grpId, pageId); seg.readLock().lock(); try { long relPtr = seg.loadedPages.get( - cacheId, + grpId, PageIdUtils.effectivePageId(pageId), - seg.partGeneration(cacheId, partId), + seg.partGeneration(grpId, partId), INVALID_REL_PTR, INVALID_REL_PTR ); @@ -660,9 +658,9 @@ public class PageMemoryImpl implements PageMemoryEx { try { // Double-check. long relPtr = seg.loadedPages.get( - cacheId, + grpId, PageIdUtils.effectivePageId(pageId), - seg.partGeneration(cacheId, partId), + seg.partGeneration(grpId, partId), INVALID_REL_PTR, OUTDATED_REL_PTR ); @@ -688,10 +686,10 @@ public class PageMemoryImpl implements PageMemoryEx { setDirty(fullId, absPtr, false, false); seg.loadedPages.put( - cacheId, + grpId, PageIdUtils.effectivePageId(pageId), relPtr, - seg.partGeneration(cacheId, partId) + seg.partGeneration(grpId, partId) ); long pageAddr = absPtr + PAGE_OVERHEAD; @@ -722,7 +720,7 @@ public class PageMemoryImpl implements PageMemoryEx { else if (relPtr == OUTDATED_REL_PTR) { assert PageIdUtils.pageIndex(pageId) == 0 : fullId; - relPtr = refreshOutdatedPage(seg, cacheId, pageId, false); + relPtr = refreshOutdatedPage(seg, grpId, pageId, false); absPtr = seg.absolute(relPtr); @@ -764,7 +762,7 @@ public class PageMemoryImpl implements PageMemoryEx { ByteBuffer buf = wrapPointer(pageAddr, pageSize()); try { - storeMgr.read(cacheId, pageId, buf); + storeMgr.read(grpId, pageId, buf); } catch (IgniteDataIntegrityViolationException ignore) { U.warn(log, "Failed to read page (data integrity violation encountered, will try to " + @@ -783,17 +781,17 @@ public class PageMemoryImpl implements PageMemoryEx { /** * @param seg Segment. - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @param pageId Page ID. * @param rmv {@code True} if page should be removed. * @return Relative pointer to refreshed page. */ - private long refreshOutdatedPage(Segment seg, int cacheId, long pageId, boolean rmv) { + private long refreshOutdatedPage(Segment seg, int grpId, long pageId, boolean rmv) { assert seg.writeLock().isHeldByCurrentThread(); - int tag = seg.partGeneration(cacheId, PageIdUtils.partId(pageId)); + int tag = seg.partGeneration(grpId, PageIdUtils.partId(pageId)); - long relPtr = seg.loadedPages.refresh(cacheId, PageIdUtils.effectivePageId(pageId), tag); + long relPtr = seg.loadedPages.refresh(grpId, PageIdUtils.effectivePageId(pageId), tag); long absPtr = seg.absolute(relPtr); @@ -815,13 +813,13 @@ public class PageMemoryImpl implements PageMemoryEx { } if (rmv) - seg.loadedPages.remove(cacheId, PageIdUtils.effectivePageId(pageId)); + seg.loadedPages.remove(grpId, PageIdUtils.effectivePageId(pageId)); if (seg.segCheckpointPages != null) - seg.segCheckpointPages.remove(new FullPageId(pageId, cacheId)); + seg.segCheckpointPages.remove(new FullPageId(pageId, grpId)); if (seg.dirtyPages != null) - seg.dirtyPages.remove(new FullPageId(pageId, cacheId)); + seg.dirtyPages.remove(new FullPageId(pageId, grpId)); return relPtr; } @@ -991,7 +989,7 @@ public class PageMemoryImpl implements PageMemoryEx { memMetrics.resetDirtyPages(); - if (isThrottlingEnabled()) + if (throttlingPlc != ThrottlingPolicy.DISABLED) writeThrottle.onBeginCheckpoint(); return new GridMultiCollectionWrapper<>(collections); @@ -1001,7 +999,7 @@ public class PageMemoryImpl implements PageMemoryEx { * @return {@code True} if throttling is enabled. */ private boolean isThrottlingEnabled() { - return throttlingPlc != ThrottlingPolicy.NONE; + return throttlingPlc != ThrottlingPolicy.CHECKPOINT_BUFFER_ONLY && throttlingPlc != ThrottlingPolicy.DISABLED; } /** {@inheritDoc} */ @@ -1013,7 +1011,7 @@ public class PageMemoryImpl implements PageMemoryEx { for (Segment seg : segments) seg.segCheckpointPages = null; - if (isThrottlingEnabled()) + if (throttlingPlc != ThrottlingPolicy.DISABLED) writeThrottle.onFinishCheckpoint(); } @@ -1340,8 +1338,8 @@ public class PageMemoryImpl implements PageMemoryEx { } /** {@inheritDoc} */ - @Override public long readLockForce(int cacheId, long pageId, long page) { - return readLockPage(page, new FullPageId(pageId, cacheId), true); + @Override public long readLockForce(int grpId, long pageId, long page) { + return readLockPage(page, new FullPageId(pageId, grpId), true); } /** @@ -1400,9 +1398,7 @@ public class PageMemoryImpl implements PageMemoryEx { if (tmpRelPtr == INVALID_REL_PTR) { rwLock.writeUnlock(absPtr + PAGE_LOCK_OFFSET, OffheapReadWriteLock.TAG_LOCK_ALWAYS); - throw new IgniteException( - "Failed to allocate temporary buffer for checkpoint " + - "(increase checkpointPageBufferSize configuration property)"); + throw new IgniteException(CHECKPOINT_POOL_OVERFLOW_ERROR_MSG + ": " + memMetrics.getName()); } // Pin the page until checkpoint is not finished. @@ -1472,7 +1468,7 @@ public class PageMemoryImpl implements PageMemoryEx { try { rwLock.writeUnlock(page + PAGE_LOCK_OFFSET, PageIdUtils.tag(pageId)); - if (isThrottlingEnabled() && !restore && markDirty && !wasDirty) + if (throttlingPlc != ThrottlingPolicy.DISABLED && !restore && markDirty && !wasDirty) writeThrottle.onMarkDirty(isInCheckpoint(fullId)); } catch (AssertionError ex) { @@ -1847,7 +1843,7 @@ public class PageMemoryImpl implements PageMemoryEx { /** Initial partition generation. */ private static final int INIT_PART_GENERATION = 1; - /** Maps partition (cacheId, partId) to its generation. Generation is 1-based incrementing partition counter. */ + /** Maps partition (grpId, partId) to its generation. Generation is 1-based incrementing partition counter. */ private final Map partGenerationMap = new HashMap<>(); /** */ @@ -1882,7 +1878,7 @@ public class PageMemoryImpl implements PageMemoryEx { pool = new PagePool(idx, poolRegion, null); - maxDirtyPages = throttlingPlc != ThrottlingPolicy.NONE + maxDirtyPages = throttlingPlc != ThrottlingPolicy.DISABLED ? pool.pages() * 3 / 4 : Math.min(pool.pages() * 2 / 3, cpPoolPages); } @@ -2599,37 +2595,37 @@ public class PageMemoryImpl implements PageMemoryEx { } /** - * Reads cache ID from the page at the given absolute pointer. + * Reads cache group ID from the page at the given absolute pointer. * * @param absPtr Absolute memory pointer to the page header. - * @return Cache ID written to the page. + * @return Cache group ID written to the page. */ - private static int readPageCacheId(final long absPtr) { + private static int readPageGroupId(final long absPtr) { return GridUnsafe.getInt(absPtr + PAGE_CACHE_ID_OFFSET); } /** - * Writes cache ID from the page at the given absolute pointer. + * Writes cache group ID from the page at the given absolute pointer. * * @param absPtr Absolute memory pointer to the page header. - * @param cacheId Cache ID to write. + * @param grpId Cache group ID to write. */ - private static void pageCacheId(final long absPtr, final int cacheId) { - GridUnsafe.putInt(absPtr + PAGE_CACHE_ID_OFFSET, cacheId); + private static void pageGroupId(final long absPtr, final int grpId) { + GridUnsafe.putInt(absPtr + PAGE_CACHE_ID_OFFSET, grpId); } /** - * Reads page ID and cache ID from the page at the given absolute pointer. + * Reads page ID and cache group ID from the page at the given absolute pointer. * * @param absPtr Absolute memory pointer to the page header. * @return Full page ID written to the page. */ private static FullPageId fullPageId(final long absPtr) { - return new FullPageId(readPageId(absPtr), readPageCacheId(absPtr)); + return new FullPageId(readPageId(absPtr), readPageGroupId(absPtr)); } /** - * Writes page ID and cache ID from the page at the given absolute pointer. + * Writes page ID and cache group ID from the page at the given absolute pointer. * * @param absPtr Absolute memory pointer to the page header. * @param fullPageId Full page ID to write. @@ -2637,7 +2633,7 @@ public class PageMemoryImpl implements PageMemoryEx { private static void fullPageId(final long absPtr, final FullPageId fullPageId) { pageId(absPtr, fullPageId.pageId()); - pageCacheId(absPtr, fullPageId.groupId()); + pageGroupId(absPtr, fullPageId.groupId()); } } @@ -2734,8 +2730,13 @@ public class PageMemoryImpl implements PageMemoryEx { * Throttling enabled and its type enum. */ public enum ThrottlingPolicy { - /** Not throttled. */NONE, - /** Target ratio based: CP progress is used as border. */ TARGET_RATIO_BASED, - /** Speed based. CP writting speed and estimated ideal speed are used as border */ SPEED_BASED + /** All ways of throttling are disabled. */ + DISABLED, + /** Only exponential throttling is used to protect from CP buffer overflow. */ + CHECKPOINT_BUFFER_ONLY, + /** Target ratio based: CP progress is used as border. */ + TARGET_RATIO_BASED, + /** Speed based. CP writting speed and estimated ideal speed are used as border */ + SPEED_BASED } } http://git-wip-us.apache.org/repos/asf/ignite/blob/05d58bb6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java index aaf5471..68fa529 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java @@ -196,21 +196,23 @@ public class PagesWriteSpeedBasedThrottle implements PagesWriteThrottlePolicy { markDirtySpeed, curCpWriteSpeed); - level = ThrottleMode.LIMITED; + level = throttleParkTimeNs == 0 ? ThrottleMode.NO : ThrottleMode.LIMITED; } } } - if (level == ThrottleMode.NO) { - exponentialBackoffCntr.set(0); - - throttleParkTimeNs = 0; - } - else if (level == ThrottleMode.EXPONENTIAL) { + if (level == ThrottleMode.EXPONENTIAL) { int exponent = exponentialBackoffCntr.getAndIncrement(); throttleParkTimeNs = (long)(STARTING_THROTTLE_NANOS * Math.pow(BACKOFF_RATIO, exponent)); } + else { + if (isPageInCheckpoint) + exponentialBackoffCntr.set(0); + + if (level == ThrottleMode.NO) + throttleParkTimeNs = 0; + } if (throttleParkTimeNs > 0) { recurrentLogIfNeed(); http://git-wip-us.apache.org/repos/asf/ignite/blob/05d58bb6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java index 78e5344..166cdcd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java @@ -32,6 +32,9 @@ public class PagesWriteThrottle implements PagesWriteThrottlePolicy { /** Database manager. */ private final CheckpointWriteProgressSupplier cpProgress; + /** If true, throttle will only protect from checkpoint buffer overflow, not from dirty pages ratio cap excess. */ + private final boolean throttleOnlyPagesInCheckpoint; + /** Checkpoint lock state checker. */ private CheckpointLockStateChecker stateChecker; @@ -41,30 +44,36 @@ public class PagesWriteThrottle implements PagesWriteThrottlePolicy { /** Backoff ratio. Each next park will be this times longer. */ private static final double BACKOFF_RATIO = 1.05; - /** Exponential backoff counter. */ - private final AtomicInteger exponentialBackoffCntr = new AtomicInteger(0); + /** Counter for dirty pages ratio throttling. */ + private final AtomicInteger notInCheckpointBackoffCntr = new AtomicInteger(0); + + /** Counter for checkpoint buffer usage ratio throttling (we need a separate one due to IGNITE-7751). */ + private final AtomicInteger inCheckpointBackoffCntr = new AtomicInteger(0); + /** * @param pageMemory Page memory. * @param cpProgress Database manager. * @param stateChecker checkpoint lock state checker. + * @param throttleOnlyPagesInCheckpoint If true, throttle will only protect from checkpoint buffer overflow. */ public PagesWriteThrottle(PageMemoryImpl pageMemory, CheckpointWriteProgressSupplier cpProgress, - CheckpointLockStateChecker stateChecker) { + CheckpointLockStateChecker stateChecker, + boolean throttleOnlyPagesInCheckpoint + ) { this.pageMemory = pageMemory; this.cpProgress = cpProgress; this.stateChecker = stateChecker; + this.throttleOnlyPagesInCheckpoint = throttleOnlyPagesInCheckpoint; + + if (!throttleOnlyPagesInCheckpoint) + assert cpProgress != null : "cpProgress must be not null if ratio based throttling mode is used"; } /** {@inheritDoc} */ @Override public void onMarkDirty(boolean isPageInCheckpoint) { assert stateChecker.checkpointLockIsHeldByThread(); - AtomicInteger writtenPagesCntr = cpProgress.writtenPagesCounter(); - - if (writtenPagesCntr == null) - return; // Don't throttle if checkpoint is not running. - boolean shouldThrottle = false; if (isPageInCheckpoint) { @@ -73,7 +82,12 @@ public class PagesWriteThrottle implements PagesWriteThrottlePolicy { shouldThrottle = pageMemory.checkpointBufferPagesCount() > checkpointBufLimit; } - if (!shouldThrottle) { + if (!shouldThrottle && !throttleOnlyPagesInCheckpoint) { + AtomicInteger writtenPagesCntr = cpProgress.writtenPagesCounter(); + + if (writtenPagesCntr == null) + return; // Don't throttle if checkpoint is not running. + int cpWrittenPages = writtenPagesCntr.get(); int cpTotalPages = cpProgress.currentCheckpointPagesCount(); @@ -92,13 +106,15 @@ public class PagesWriteThrottle implements PagesWriteThrottlePolicy { } } + AtomicInteger cntr = isPageInCheckpoint ? inCheckpointBackoffCntr : notInCheckpointBackoffCntr; + if (shouldThrottle) { - int throttleLevel = exponentialBackoffCntr.getAndIncrement(); + int throttleLevel = cntr.getAndIncrement(); LockSupport.parkNanos((long)(STARTING_THROTTLE_NANOS * Math.pow(BACKOFF_RATIO, throttleLevel))); } else - exponentialBackoffCntr.set(0); + cntr.set(0); } /** {@inheritDoc} */ @@ -107,6 +123,8 @@ public class PagesWriteThrottle implements PagesWriteThrottlePolicy { /** {@inheritDoc} */ @Override public void onFinishCheckpoint() { - exponentialBackoffCntr.set(0); + inCheckpointBackoffCntr.set(0); + + notInCheckpointBackoffCntr.set(0); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/05d58bb6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java index a305b7f..3737204 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java @@ -76,7 +76,7 @@ public class BPlusTreePageMemoryImplTest extends BPlusTreeSelfTest { }, () -> true, new DataRegionMetricsImpl(new DataRegionConfiguration()), - PageMemoryImpl.ThrottlingPolicy.NONE, + PageMemoryImpl.ThrottlingPolicy.DISABLED, null ); http://git-wip-us.apache.org/repos/asf/ignite/blob/05d58bb6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java index 1fc34c5..0c786ad 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java @@ -23,7 +23,6 @@ import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider; import org.apache.ignite.internal.pagemem.FullPageId; import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker; import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.database.BPlusTreeReuseSelfTest; @@ -77,7 +76,7 @@ public class BPlusTreeReuseListPageMemoryImplTest extends BPlusTreeReuseSelfTest }, () -> true, new DataRegionMetricsImpl(new DataRegionConfiguration()), - PageMemoryImpl.ThrottlingPolicy.NONE, + PageMemoryImpl.ThrottlingPolicy.DISABLED, null ); http://git-wip-us.apache.org/repos/asf/ignite/blob/05d58bb6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java index a87a61e..c6f42e1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java @@ -223,9 +223,8 @@ public class IgnitePageMemReplaceDelayedWriteUnitTest { DirectMemoryProvider provider = new UnsafeMemoryProvider(log); - PageMemoryImpl memory - = new PageMemoryImpl(provider, sizes, sctx, pageSize, - pageWriter, null, () -> true, memMetrics, null, null); + PageMemoryImpl memory = new PageMemoryImpl(provider, sizes, sctx, pageSize, + pageWriter, null, () -> true, memMetrics, PageMemoryImpl.ThrottlingPolicy.DISABLED, null); memory.start(); return memory; http://git-wip-us.apache.org/repos/asf/ignite/blob/05d58bb6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java index 1cef087..f9ca7e4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java @@ -23,7 +23,6 @@ import java.util.concurrent.locks.LockSupport; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker; import org.apache.ignite.internal.processors.cache.persistence.CheckpointWriteProgressSupplier; -import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.logger.NullLogger; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/ignite/blob/05d58bb6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java index 4495dc1..9087b1c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java @@ -24,7 +24,6 @@ import org.apache.ignite.internal.mem.file.MappedFileMemoryProvider; import org.apache.ignite.internal.pagemem.FullPageId; import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker; import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.database.IndexStorageSelfTest; @@ -92,7 +91,7 @@ public class IndexStoragePageMemoryImplTest extends IndexStorageSelfTest { }, () -> true, new DataRegionMetricsImpl(new DataRegionConfiguration()), - PageMemoryImpl.ThrottlingPolicy.NONE, + PageMemoryImpl.ThrottlingPolicy.DISABLED, null ); } http://git-wip-us.apache.org/repos/asf/ignite/blob/05d58bb6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java index 3c169be..34fd93b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java @@ -86,7 +86,8 @@ public class PageMemoryImplNoLoadTest extends PageMemoryNoLoadSelfTest { } }, new DataRegionMetricsImpl(new DataRegionConfiguration()), - PageMemoryImpl.ThrottlingPolicy.NONE, null + PageMemoryImpl.ThrottlingPolicy.DISABLED, + null ); } http://git-wip-us.apache.org/repos/asf/ignite/blob/05d58bb6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java index 8f0ef39..31af118 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java @@ -17,24 +17,40 @@ package org.apache.ignite.internal.processors.cache.persistence.pagemem; +import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.mem.DirectMemoryProvider; import org.apache.ignite.internal.mem.IgniteOutOfMemoryException; import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider; import org.apache.ignite.internal.pagemem.FullPageId; import org.apache.ignite.internal.pagemem.PageIdAllocator; +import org.apache.ignite.internal.pagemem.PageUtils; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker; +import org.apache.ignite.internal.processors.cache.persistence.CheckpointWriteProgressSupplier; import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor; import org.apache.ignite.internal.util.lang.GridInClosure3X; import org.apache.ignite.plugin.PluginProvider; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.GridTestKernalContext; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.testframework.junits.logger.GridTestLog4jLogger; +import org.mockito.Mockito; + +import static org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl.CHECKPOINT_POOL_OVERFLOW_ERROR_MSG; /** * @@ -46,11 +62,14 @@ public class PageMemoryImplTest extends GridCommonAbstractTest { /** Page size. */ private static final int PAGE_SIZE = 1024; + /** Max memory size. */ + private static final int MAX_SIZE = 128; + /** * @throws Exception if failed. */ public void testThatAllocationTooMuchPagesCauseToOOMException() throws Exception { - PageMemoryImpl memory = createPageMemory(); + PageMemoryImpl memory = createPageMemory(PageMemoryImpl.ThrottlingPolicy.DISABLED); try { while (!Thread.currentThread().isInterrupted()) @@ -64,15 +83,186 @@ public class PageMemoryImplTest extends GridCommonAbstractTest { } /** - * + * @throws Exception If failed. + */ + public void testCheckpointBufferOverusageDontCauseWriteLockLeak() throws Exception { + PageMemoryImpl memory = createPageMemory(PageMemoryImpl.ThrottlingPolicy.DISABLED); + + List pages = new ArrayList<>(); + + try { + while (!Thread.currentThread().isInterrupted()) { + long pageId = memory.allocatePage(1, PageIdAllocator.INDEX_PARTITION, PageIdAllocator.FLAG_IDX); + + FullPageId fullPageId = new FullPageId(pageId, 1); + + pages.add(fullPageId); + + acquireAndReleaseWriteLock(memory, fullPageId); //to set page id, otherwise we would fail with assertion error + } + } + catch (IgniteOutOfMemoryException ignore) { + //Success + } + + memory.beginCheckpoint(); + + final AtomicReference lastPage = new AtomicReference<>(); + + try { + for (FullPageId fullPageId : pages) { + lastPage.set(fullPageId); + + acquireAndReleaseWriteLock(memory, fullPageId); + } + } + catch (Exception ex) { + assertTrue(ex.getMessage().startsWith(CHECKPOINT_POOL_OVERFLOW_ERROR_MSG)); + } + + memory.finishCheckpoint(); + + GridTestUtils.runAsync(new Runnable() { + @Override public void run() { + try { + acquireAndReleaseWriteLock(memory, lastPage.get()); //we should be able get lock again + } + catch (IgniteCheckedException e) { + throw new AssertionError(e); + } + } + }).get(getTestTimeout()); + } + + /** + * Tests that checkpoint buffer won't be overflowed with enabled CHECKPOINT_BUFFER_ONLY throttling. + * @throws Exception If failed. */ - private PageMemoryImpl createPageMemory() throws Exception { + public void testCheckpointBufferCantOverflowMixedLoad() throws Exception { + testCheckpointBufferCantOverflowWithThrottlingMixedLoad(PageMemoryImpl.ThrottlingPolicy.CHECKPOINT_BUFFER_ONLY); + } + + /** + * Tests that checkpoint buffer won't be overflowed with enabled SPEED_BASED throttling. + * @throws Exception If failed. + */ + public void testCheckpointBufferCantOverflowMixedLoadSpeedBased() throws Exception { + testCheckpointBufferCantOverflowWithThrottlingMixedLoad(PageMemoryImpl.ThrottlingPolicy.SPEED_BASED); + } + + /** + * Tests that checkpoint buffer won't be overflowed with enabled TARGET_RATIO_BASED throttling. + * @throws Exception If failed. + */ + public void testCheckpointBufferCantOverflowMixedLoadRatioBased() throws Exception { + testCheckpointBufferCantOverflowWithThrottlingMixedLoad(PageMemoryImpl.ThrottlingPolicy.TARGET_RATIO_BASED); + } + + /** + * @throws Exception If failed. + */ + private void testCheckpointBufferCantOverflowWithThrottlingMixedLoad(PageMemoryImpl.ThrottlingPolicy plc) throws Exception { + PageMemoryImpl memory = createPageMemory(plc); + + List pages = new ArrayList<>(); + + for (int i = 0; i < (MAX_SIZE - 10) * MB / PAGE_SIZE / 2; i++) { + long pageId = memory.allocatePage(1, PageIdAllocator.INDEX_PARTITION, PageIdAllocator.FLAG_IDX); + + FullPageId fullPageId = new FullPageId(pageId, 1); + + pages.add(fullPageId); + + acquireAndReleaseWriteLock(memory, fullPageId); + } + + memory.beginCheckpoint(); + + CheckpointMetricsTracker mockTracker = Mockito.mock(CheckpointMetricsTracker.class); + + for (FullPageId checkpointPage : pages) + memory.getForCheckpoint(checkpointPage, ByteBuffer.allocate(PAGE_SIZE), mockTracker); + + memory.finishCheckpoint(); + + for (int i = (int)((MAX_SIZE - 10) * MB / PAGE_SIZE / 2); i < (MAX_SIZE - 20) * MB / PAGE_SIZE; i++) { + long pageId = memory.allocatePage(1, PageIdAllocator.INDEX_PARTITION, PageIdAllocator.FLAG_IDX); + + FullPageId fullPageId = new FullPageId(pageId, 1); + + pages.add(fullPageId); + + acquireAndReleaseWriteLock(memory, fullPageId); + } + + memory.beginCheckpoint(); + + Collections.shuffle(pages); // Mix pages in checkpoint with clean pages + + AtomicBoolean stop = new AtomicBoolean(false); + + try { + GridTestUtils.runAsync(new Runnable() { + @Override public void run() { + for (FullPageId page : pages) { + if (ThreadLocalRandom.current().nextDouble() < 0.5) // Mark dirty 50% of pages + try { + acquireAndReleaseWriteLock(memory, page); + + if (stop.get()) + break; + } + catch (IgniteCheckedException e) { + log.error("runAsync ended with exception", e); + + fail(); + } + } + } + }).get(5_000); + } + catch (IgniteFutureTimeoutCheckedException ex) { + // Expected. + } + finally { + stop.set(true); + } + + memory.finishCheckpoint(); + } + + /** + * @param memory Memory. + * @param fullPageId Full page id. + * @throws IgniteCheckedException If acquiring lock failed. + */ + private void acquireAndReleaseWriteLock(PageMemoryImpl memory, FullPageId fullPageId) throws IgniteCheckedException { + long page = memory.acquirePage(1, fullPageId.pageId()); + + long address = memory.writeLock(1, fullPageId.pageId(), page); + + PageIO.setPageId(address, fullPageId.pageId()); + + PageIO.setType(address, PageIO.T_BPLUS_META); + + PageUtils.putShort(address, PageIO.VER_OFF, (short)1); + + memory.writeUnlock(1, fullPageId.pageId(), page, Boolean.FALSE, true); + + memory.releasePage(1, fullPageId.pageId(), page); + } + + /** + * @param throttlingPlc Throttling Policy. + * @throws Exception If creating mock failed. + */ + private PageMemoryImpl createPageMemory(PageMemoryImpl.ThrottlingPolicy throttlingPlc) throws Exception { long[] sizes = new long[5]; for (int i = 0; i < sizes.length; i++) - sizes[i] = 1024 * MB / 4; + sizes[i] = MAX_SIZE * MB / 4; - sizes[4] = 10 * MB; + sizes[4] = 5 * MB; DirectMemoryProvider provider = new UnsafeMemoryProvider(log); @@ -101,6 +291,13 @@ public class PageMemoryImplTest extends GridCommonAbstractTest { null ); + CheckpointWriteProgressSupplier noThrottle = Mockito.mock(CheckpointWriteProgressSupplier.class); + + Mockito.when(noThrottle.currentCheckpointPagesCount()).thenReturn(1_000_000); + Mockito.when(noThrottle.evictedPagesCntr()).thenReturn(new AtomicInteger(0)); + Mockito.when(noThrottle.syncedPagesCounter()).thenReturn(new AtomicInteger(1_000_000)); + Mockito.when(noThrottle.writtenPagesCounter()).thenReturn(new AtomicInteger(1_000_000)); + PageMemoryImpl mem = new PageMemoryImpl( provider, sizes, @@ -118,8 +315,8 @@ public class PageMemoryImplTest extends GridCommonAbstractTest { } }, new DataRegionMetricsImpl(igniteCfg.getDataStorageConfiguration().getDefaultDataRegionConfiguration()), - PageMemoryImpl.ThrottlingPolicy.NONE, - null + throttlingPlc, + noThrottle ); mem.start();