Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 83EFE200CA4 for ; Wed, 7 Jun 2017 20:09:13 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 827B2160BE6; Wed, 7 Jun 2017 18:09:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id AC50E160BBF for ; Wed, 7 Jun 2017 20:09:11 +0200 (CEST) Received: (qmail 20745 invoked by uid 500); 7 Jun 2017 18:09:09 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 20731 invoked by uid 99); 7 Jun 2017 18:09:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Jun 2017 18:09:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AEBD3DFBDA; Wed, 7 Jun 2017 18:09:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: agoncharuk@apache.org To: commits@ignite.apache.org Date: Wed, 07 Jun 2017 18:09:09 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/4] ignite git commit: Merged 4.ea2 into 5267 archived-at: Wed, 07 Jun 2017 18:09:13 -0000 Repository: ignite Updated Branches: refs/heads/ignite-5267-merge-ea [created] 518238473 http://git-wip-us.apache.org/repos/asf/ignite/blob/51823847/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java index ff5dfa9..1faffde 100755 --- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java +++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java @@ -40,7 +40,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableMap; -import java.util.Set; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -77,6 +76,7 @@ import org.apache.ignite.events.EventType; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.mem.DirectMemoryProvider; import org.apache.ignite.internal.pagemem.FullPageId; import org.apache.ignite.internal.pagemem.PageIdUtils; @@ -123,21 +123,22 @@ import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.future.CountDownFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridInClosure3X; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.P3; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.LT; +import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.mxbean.PersistenceMetricsMXBean; import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_PARTITION_DESTROY_CHECKPOINT_DELAY; import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC; import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD; @@ -155,10 +156,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** */ private boolean skipCrc = IgniteSystemProperties.getBoolean(IGNITE_PDS_SKIP_CRC, false); - /** Checkpoint initiation delay after a partition has been scheduled for destroy. */ - private volatile long partDestroyCheckpointDelay = - IgniteSystemProperties.getLong(IGNITE_PDS_PARTITION_DESTROY_CHECKPOINT_DELAY, 30_000); - /** */ private final int walRebalanceThreshold = IgniteSystemProperties.getInteger( IGNITE_PDS_WAL_REBALANCE_THRESHOLD, 500_000); @@ -184,6 +181,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan }; /** */ + private static final Comparator ASC_PART_COMPARATOR = new Comparator() { + @Override public int compare(GridDhtLocalPartition a, GridDhtLocalPartition b) { + return Integer.compare(a.id(), b.id()); + } + }; + + /** */ private static final Comparator CP_TS_COMPARATOR = new Comparator() { /** {@inheritDoc} */ @Override public int compare(File o1, File o2) { @@ -702,6 +706,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan checkpointer = null; + cp.scheduledCp.cpFinishFut.onDone( + new NodeStoppingException("Checkpointer is stopped during node stop.")); + break; } catch (IgniteInterruptedCheckedException ignored) { @@ -1114,20 +1121,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** - * Cancels partition destroy if it has not begun yet. Otherwise, will wait for cleanup to finish. - * - * @param grpId Cache group ID. - * @param partId Partition ID. - */ - void cancelOrWaitPartitionDestroy(int grpId, int partId) - throws IgniteCheckedException { - Checkpointer cp = checkpointer; - - if (cp != null) - cp.cancelOrWaitPartitionDestroy(grpId, partId); - } - - /** * Tries to search for a WAL pointer for the given partition counter start. * * @param grpId Cache group ID. @@ -1447,8 +1440,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } if (status.needRestoreMemory()) { - assert !apply : "Restoring memory state failed, checkpoint marker [cpId=" + status.cpStartId + - "] was not found in WAL"; + if (apply) + throw new IgniteCheckedException("Failed to restore memory state (checkpoint marker is present " + + "on disk, but checkpoint record is missed in WAL) " + + "[cpStatus=" + status + ", lastRead=" + lastRead + "]"); log.info("Finished applying memory changes [changesApplied=" + applied + ", time=" + (U.currentTimeMillis() - start) + "ms]"); @@ -1809,64 +1804,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** - * @param reqs Destroy requests. - */ - @SuppressWarnings("TypeMayBeWeakened") - private void finishDestroyPartitionsAsync(final Collection reqs) { - final Map> filterMap = new HashMap<>(); - - final Set pageMemSet = new HashSet<>(); - - for (PartitionDestroyRequest req : reqs) { - Collection partIds = filterMap.get(req.grpId); - - if (partIds == null) { - partIds = new HashSet<>(); - - filterMap.put(req.grpId, partIds); - } - - partIds.add(req.partId); - - pageMemSet.add((PageMemoryEx)req.memPlc.pageMemory()); - } - - for (PageMemoryEx pageMem : pageMemSet) { - IgniteInternalFuture clearFut = pageMem.clearAsync(new P3() { - @Override public boolean apply(Integer grpId, Long pageId, Integer tag) { - assert grpId != null; - assert pageId != null; - - int partId = PageIdUtils.partId(pageId); - - Collection parts = filterMap.get(grpId); - - return parts != null && parts.contains(partId); - } - }, true); - - clearFut.listen(new IgniteInClosure>() { - @Override public void apply(IgniteInternalFuture clearFut) { - for (PartitionDestroyRequest req : reqs) { - try { - assert !req.allowFastEviction; - - // Tag should never grow in this case. - cctx.pageStore().onPartitionDestroyed(req.grpId, req.partId, 1); - } - catch (IgniteCheckedException e) { - req.onDone(e); - } - finally { - req.onDone(clearFut.error()); - } - } - } - }); - } - } - - /** * */ @SuppressWarnings("NakedNotify") @@ -1923,6 +1860,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan // Final run after the cancellation. if (checkpointsEnabled && !shutdownNow) doCheckpoint(); + + scheduledCp.cpFinishFut.onDone(new NodeStoppingException("Node is stopping.")); } /** @@ -1979,43 +1918,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** - * @param grp Cache group. - * @param partId Partition ID. - */ - private void schedulePartitionDestroy(CacheGroupContext grp, int partId) { - synchronized (this) { - scheduledCp.destroyQueue.addDestroyRequest(grp, partId); - } - - wakeupForCheckpoint(partDestroyCheckpointDelay, "partition destroy"); - } - - /** - * @param grpId Cache group ID. - * @param partId Partition ID. - */ - private void cancelOrWaitPartitionDestroy(int grpId, int partId) - throws IgniteCheckedException { - CheckpointProgress cur = curCpProgress; - - PartitionDestroyRequest req; - - if (cur != null) { - req = cur.destroyQueue.cancelDestroy(grpId, partId); - - if (req != null) - req.waitCompleted(); - } - - synchronized (this) { - req = scheduledCp.destroyQueue.cancelDestroy(grpId, partId); - } - - if (req != null) - req.waitCompleted(); - } - - /** * */ private void doCheckpoint() { @@ -2031,59 +1933,71 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan boolean interrupted = true; try { - // Identity stores set. - GridConcurrentHashSet updStores = new GridConcurrentHashSet<>(); + if (chp.hasDelta()) { + // Identity stores set. + GridConcurrentHashSet updStores = new GridConcurrentHashSet<>(); - CountDownFuture doneWriteFut = new CountDownFuture( - asyncRunner == null ? 1 : chp.cpPages.collectionsSize()); + CountDownFuture doneWriteFut = new CountDownFuture( + asyncRunner == null ? 1 : chp.cpPages.collectionsSize()); - tracker.onPagesWriteStart(); + tracker.onPagesWriteStart(); - if (asyncRunner != null) { - for (int i = 0; i < chp.cpPages.collectionsSize(); i++) { - Runnable write = new WriteCheckpointPages( - tracker, - chp.cpPages.innerCollection(i), - updStores, - doneWriteFut - ); + if (asyncRunner != null) { + for (int i = 0; i < chp.cpPages.collectionsSize(); i++) { + Runnable write = new WriteCheckpointPages( + tracker, + chp.cpPages.innerCollection(i), + updStores, + doneWriteFut + ); - try { - asyncRunner.execute(write); - } - catch (RejectedExecutionException ignore) { - // Run the task synchronously. - write.run(); + try { + asyncRunner.execute(write); + } + catch (RejectedExecutionException ignore) { + // Run the task synchronously. + write.run(); + } } } - } - else { - // Single-threaded checkpoint. - Runnable write = new WriteCheckpointPages(tracker, chp.cpPages, updStores, doneWriteFut); + else { + // Single-threaded checkpoint. + Runnable write = new WriteCheckpointPages(tracker, chp.cpPages, updStores, doneWriteFut); - write.run(); - } + write.run(); + } + + // Wait and check for errors. + doneWriteFut.get(); - // Wait and check for errors. - doneWriteFut.get(); + // Must re-check shutdown flag here because threads may have skipped some pages. + // If so, we should not put finish checkpoint mark. + if (shutdownNow) { + chp.progress.cpFinishFut.onDone(new NodeStoppingException("Node is stopping.")); - // Must re-check shutdown flag here because threads may have skipped some pages. - // If so, we should not put finish checkpoint mark. - if (shutdownNow) - return; + return; + } - snapshotMgr.afterCheckpointPageWritten(); + snapshotMgr.afterCheckpointPageWritten(); - tracker.onFsyncStart(); + tracker.onFsyncStart(); - if (!skipSync) { - for (PageStore updStore : updStores) { - if (shutdownNow) - return; + if (!skipSync) { + for (PageStore updStore : updStores) { + if (shutdownNow) { + chp.progress.cpFinishFut.onDone(new NodeStoppingException("Node is stopping.")); + + return; + } - updStore.sync(); + updStore.sync(); + } } } + else { + tracker.onPagesWriteStart(); + tracker.onFsyncStart(); + } // Must mark successful checkpoint only if there are no exceptions or interrupts. interrupted = false; @@ -2095,58 +2009,43 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan tracker.onEnd(); - // We finished this checkpoint, now it's time to clean up partitions. - PartitionDestroyQueue destroyQueue = chp.progress.destroyQueue; - - Collection reqs = null; - WALPointer lastPtr = null; - - for (T2 destroyId : destroyQueue.pendingReqs.keySet()) { - PartitionDestroyRequest req = destroyQueue.beginDestroy(destroyId); - - if (req != null) { - // Log destroy record before actual partition clear. - lastPtr = cctx.wal().log(new PartitionDestroyRecord(req.grpId, req.partId)); - - if (reqs == null) - reqs = new ArrayList<>(); - - reqs.add(req); + if (chp.hasDelta()) { + if (printCheckpointStats) { + if (log.isInfoEnabled()) + log.info(String.format("Checkpoint finished [cpId=%s, pages=%d, markPos=%s, " + + "walSegmentsCleared=%d, markDuration=%dms, pagesWrite=%dms, fsync=%dms, " + + "total=%dms]", + chp.cpEntry.checkpointId(), + pages, + chp.cpEntry.checkpointMark(), + chp.walFilesDeleted, + tracker.markDuration(), + tracker.pagesWriteDuration(), + tracker.fsyncDuration(), + tracker.totalDuration())); } - } - - if (reqs != null) { - assert lastPtr != null; - cctx.wal().fsync(lastPtr); - - finishDestroyPartitionsAsync(reqs); + persStoreMetrics.onCheckpoint( + tracker.lockWaitDuration(), + tracker.markDuration(), + tracker.pagesWriteDuration(), + tracker.fsyncDuration(), + tracker.totalDuration(), + pages, + tracker.dataPagesWritten(), + tracker.cowPagesWritten()); } - - if (printCheckpointStats) { - if (log.isInfoEnabled()) - log.info(String.format("Checkpoint finished [cpId=%s, pages=%d, markPos=%s, " + - "walSegmentsCleared=%d, markDuration=%dms, pagesWrite=%dms, fsync=%dms, " + - "total=%dms]", - chp.cpEntry.checkpointId(), - pages, - chp.cpEntry.checkpointMark(), - chp.walFilesDeleted, - tracker.markDuration(), - tracker.pagesWriteDuration(), - tracker.fsyncDuration(), - tracker.totalDuration())); + else { + persStoreMetrics.onCheckpoint( + tracker.lockWaitDuration(), + tracker.markDuration(), + tracker.pagesWriteDuration(), + tracker.fsyncDuration(), + tracker.totalDuration(), + pages, + tracker.dataPagesWritten(), + tracker.cowPagesWritten()); } - - persStoreMetrics.onCheckpoint( - tracker.lockWaitDuration(), - tracker.markDuration(), - tracker.pagesWriteDuration(), - tracker.fsyncDuration(), - tracker.totalDuration(), - pages, - tracker.dataPagesWritten(), - tracker.cowPagesWritten()); } catch (IgniteCheckedException e) { // TODO-ignite-db how to handle exception? @@ -2191,7 +2090,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan private Checkpoint markCheckpointBegin(CheckpointMetricsTracker tracker) throws IgniteCheckedException { CheckpointRecord cpRec = new CheckpointRecord(null, false); - WALPointer cpPtr; + WALPointer cpPtr = null; GridMultiCollectionWrapper cpPages; @@ -2239,7 +2138,14 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan if (grp.isLocal()) continue; - CacheState state = new CacheState(); + List locParts = new ArrayList<>(); + + for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions()) + locParts.add(part); + + Collections.sort(locParts, ASC_PART_COMPARATOR); + + CacheState state = new CacheState(locParts.size()); for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions()) state.addPartitionState(part.id(), part.dataStore().fullSize(), part.lastAppliedUpdate()); @@ -2250,12 +2156,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan if (curr.nextSnapshot) snapshotMgr.onMarkCheckPointBegin(curr.snapshotOperation, map); - // No page updates for this checkpoint are allowed from now on. - cpPtr = cctx.wal().log(cpRec); - - if (cpPtr == null) - cpPtr = CheckpointStatus.NULL_PTR; - IgniteBiTuple>, Integer> tup = beginAllCheckpoints(); // Todo it maybe more optimally @@ -2267,6 +2167,14 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } cpPages = new GridMultiCollectionWrapper<>(cpPagesList); + + if (!F.isEmpty(cpPages)) { + // No page updates for this checkpoint are allowed from now on. + cpPtr = cctx.wal().log(cpRec); + + if (cpPtr == null) + cpPtr = CheckpointStatus.NULL_PTR; + } } finally { checkpointLock.writeLock().unlock(); @@ -2276,34 +2184,50 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan curr.cpBeginFut.onDone(); - // Sync log outside the checkpoint write lock. - cctx.wal().fsync(cpPtr); + if (!F.isEmpty(cpPages)) { + assert cpPtr != null; - long cpTs = System.currentTimeMillis(); + // Sync log outside the checkpoint write lock. + cctx.wal().fsync(cpPtr); - CheckpointEntry cpEntry = writeCheckpointEntry( - tmpWriteBuf, - cpTs, - cpRec.checkpointId(), - cpPtr, - cpRec, - CheckpointEntryType.START); + long cpTs = System.currentTimeMillis(); - checkpointHist.addCheckpointEntry(cpEntry); + CheckpointEntry cpEntry = writeCheckpointEntry( + tmpWriteBuf, + cpTs, + cpRec.checkpointId(), + cpPtr, + cpRec, + CheckpointEntryType.START); - if (printCheckpointStats) - if (log.isInfoEnabled()) - log.info(String.format("Checkpoint started [checkpointId=%s, startPtr=%s, checkpointLockWait=%dms, " + - "checkpointLockHoldTime=%dms, pages=%d, reason='%s']", - cpRec.checkpointId(), - cpPtr, - tracker.lockWaitDuration(), - tracker.lockHoldDuration(), - cpPages.size(), - curr.reason) - ); + checkpointHist.addCheckpointEntry(cpEntry); + + if (printCheckpointStats) + if (log.isInfoEnabled()) + log.info(String.format("Checkpoint started [checkpointId=%s, startPtr=%s, checkpointLockWait=%dms, " + + "checkpointLockHoldTime=%dms, pages=%d, reason='%s']", + cpRec.checkpointId(), + cpPtr, + tracker.lockWaitDuration(), + tracker.lockHoldDuration(), + cpPages.size(), + curr.reason) + ); + + return new Checkpoint(cpEntry, cpPages, curr); + } + else { + if (printCheckpointStats) { + if (log.isInfoEnabled()) + LT.info(log, String.format("Skipping checkpoint (no pages were modified) [" + + "checkpointLockWait=%dms, checkpointLockHoldTime=%dms, reason='%s']", + tracker.lockWaitDuration(), + tracker.lockHoldDuration(), + curr.reason)); + } - return new Checkpoint(cpEntry, cpPages, curr); + return new Checkpoint(null, null, curr); + } } /** @@ -2333,16 +2257,17 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan for (MemoryPolicy memPlc : memoryPolicies()) ((PageMemoryEx)memPlc.pageMemory()).finishCheckpoint(); - writeCheckpointEntry( - tmpWriteBuf, - chp.cpEntry.checkpointTimestamp(), - chp.cpEntry.checkpointId(), - chp.cpEntry.checkpointMark(), - null, - CheckpointEntryType.END); + if (chp.hasDelta()) + writeCheckpointEntry( + tmpWriteBuf, + chp.cpEntry.checkpointTimestamp(), + chp.cpEntry.checkpointId(), + chp.cpEntry.checkpointMark(), + null, + CheckpointEntryType.END); } - chp.walFilesDeleted = checkpointHist.onCheckpointFinished(); + checkpointHist.onCheckpointFinished(chp); chp.progress.cpFinishFut.onDone(); } @@ -2493,6 +2418,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** Number of deleted WAL files. */ private int walFilesDeleted; + /** Number of deleted WAL files. */ + private int walHistorySize; + + /** */ + private final int pagesSize; + /** * @param cpEntry Checkpoint entry. * @param cpPages Pages to write to the page store. @@ -2503,11 +2434,20 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan GridMultiCollectionWrapper cpPages, CheckpointProgress progress ) { - assert cpEntry.initGuard != 0; + assert cpEntry == null || cpEntry.initGuard != 0; this.cpEntry = cpEntry; this.cpPages = cpPages; this.progress = progress; + + pagesSize = cpPages == null ? 0 : cpPages.size(); + } + + /** + * @return {@code true} if this checkpoint contains at least one dirty page. + */ + private boolean hasDelta() { + return pagesSize != 0; } } @@ -2528,12 +2468,14 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan private UUID cpStartId; /** */ + @GridToStringInclude private WALPointer startPtr; /** */ private UUID cpEndId; /** */ + @GridToStringInclude private WALPointer endPtr; /** @@ -2556,6 +2498,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan public boolean needRestoreMemory() { return !F.eq(cpStartId, cpEndId) && !F.eq(NULL_UUID, cpStartId); } + + /** {@inheritDoc} */ + public String toString() { + return S.toString(CheckpointStatus.class, this); + } } /** @@ -2583,9 +2530,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** Wakeup reason. */ private String reason; - /** */ - private final PartitionDestroyQueue destroyQueue = new PartitionDestroyQueue(); - /** * @param nextCpTs Next checkpoint timestamp. */ @@ -2699,7 +2643,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** * Clears checkpoint history. */ - private int onCheckpointFinished() { + private void onCheckpointFinished(Checkpoint chp) { int deleted = 0; while (histMap.size() > persistenceCfg.getWalHistorySize()) { @@ -2707,8 +2651,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan CheckpointEntry cpEntry = entry.getValue(); - if (cctx.wal().reserved(cpEntry.checkpointMark())) + if (cctx.wal().reserved(cpEntry.checkpointMark())) { + U.warn(log, "Could not clear historyMap due to WAL reservation on cpEntry " + cpEntry.cpId + + ", history map size is " + histMap.size()); + break; + } File startFile = new File(cpDir.getAbsolutePath(), cpEntry.startFile()); File endFile = new File(cpDir.getAbsolutePath(), cpEntry.endFile()); @@ -2738,7 +2686,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan break; } - return deleted; + chp.walFilesDeleted = deleted; + chp.walHistorySize = histMap.size(); } /** @@ -2760,11 +2709,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan if (grpState == null) continue; - CacheState.PartitionState partState = grpState.partitions().get(partId); + long partCntr = grpState.counterByPartition(partId); - if (partState != null) { + if (partCntr >= 0) { if (cctx.wal().reserve(entry.checkpointMark())) - return partState.partitionCounter(); + return partCntr; } } catch (Exception e) { @@ -2888,9 +2837,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan CacheState state = cacheGrpStates.get(grpId); if (state != null) { - CacheState.PartitionState partState = state.partitions().get(part); + long cntr = state.counterByPartition(part); - return partState == null ? null : partState.partitionCounter(); + return cntr < 0 ? null : cntr; } return null; @@ -2930,157 +2879,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } } - - - /** - * Partition destroy queue. - */ - private static class PartitionDestroyQueue { - /** */ - private final ConcurrentMap, PartitionDestroyRequest> pendingReqs = - new ConcurrentHashMap<>(); - - /** - * @param grp Cache group. - * @param partId Partition ID to destroy. - */ - private void addDestroyRequest(CacheGroupContext grp, int partId) { - PartitionDestroyRequest req = new PartitionDestroyRequest(grp, partId); - - PartitionDestroyRequest old = pendingReqs.putIfAbsent(new T2<>(grp.groupId(), partId), req); - - assert old == null : "Must wait for old destroy request to finish before adding a new one " + - "[grpId=" + grp.groupId() + ", name=" + grp.cacheOrGroupName() + ", partId=" + partId + ']'; - } - - /** - * @param destroyId Destroy ID. - * @return Destroy request to complete if was not concurrently cancelled. - */ - private PartitionDestroyRequest beginDestroy(T2 destroyId) { - PartitionDestroyRequest rmvd = pendingReqs.remove(destroyId); - - return rmvd == null ? null : rmvd.beginDestroy() ? rmvd : null; - } - - /** - * @param cacheId Cache ID. - * @param partId Partition ID. - * @return Destroy request to wait for if destroy has begun. - */ - private PartitionDestroyRequest cancelDestroy(int cacheId, int partId) { - PartitionDestroyRequest rmvd = pendingReqs.remove(new T2<>(cacheId, partId)); - - return rmvd == null ? null : !rmvd.cancel() ? rmvd : null; - } - } - - /** - * Partition destroy request. - */ - private static class PartitionDestroyRequest { - /** */ - private int grpId; - - /** */ - private final MemoryPolicy memPlc; - - /** */ - private String name; - - /** */ - private int partId; - - /** */ - private boolean allowFastEviction; - - /** Destroy cancelled flag. */ - private boolean cancelled; - - /** Destroy future. Not null if partition destroy has begun. */ - private GridFutureAdapter destroyFut; - - /** - * @param grp Cache group. - * @param partId Partition ID. - */ - private PartitionDestroyRequest(CacheGroupContext grp, int partId) { - grpId = grp.groupId(); - memPlc = grp.memoryPolicy(); - name = grp.cacheOrGroupName(); - allowFastEviction = grp.allowFastEviction(); - - this.partId = partId; - } - - /** - * Cancels partition destroy request. - * - * @return {@code False} if this request needs to be waited for. - */ - private synchronized boolean cancel() { - if (destroyFut != null) { - assert !cancelled; - - return false; - } - - cancelled = true; - - return true; - } - - /** - * Initiates partition destroy. - * - * @return {@code True} if destroy request should be executed, {@code false} otherwise. - */ - private synchronized boolean beginDestroy() { - if (cancelled) { - assert destroyFut == null; - - return false; - } - - if (destroyFut != null) - return false; - - destroyFut = new GridFutureAdapter<>(); - - return true; - } - - /** - * - */ - private synchronized void onDone(Throwable err) { - assert destroyFut != null; - - destroyFut.onDone(err); - } - - /** - * - */ - private void waitCompleted() throws IgniteCheckedException { - GridFutureAdapter fut; - - synchronized (this) { - assert destroyFut != null; - - fut = destroyFut; - } - - fut.get(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return "PartitionDestroyRequest [grpId=" + grpId + ", name=" + name + - ", partId=" + partId + ']'; - } - } - /** * */ http://git-wip-us.apache.org/repos/asf/ignite/blob/51823847/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheOffheapManager.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheOffheapManager.java index 3904205..c0176f6 100644 --- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheOffheapManager.java +++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheOffheapManager.java @@ -137,9 +137,6 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple throws IgniteCheckedException { GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)ctx.database(); - if (!grp.allowFastEviction()) - dbMgr.cancelOrWaitPartitionDestroy(grp.groupId(), p); - boolean exists = ctx.pageStore() != null && ctx.pageStore().exists(grp.groupId(), p); @@ -213,14 +210,24 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple try { long pageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage); + if (pageAddr == 0L) { + U.warn(log, "Failed to acquire write lock for meta page [metaPage=" + partMetaPage + + ", saveMeta=" + saveMeta + ", beforeDestroy=" + beforeDestroy + ", size=" + size + + ", updCntr=" + updCntr + ", state=" + state + ']'); + + return false; + } + + boolean changed = false; + try { PagePartitionMetaIO io = PageIO.getPageIO(pageAddr); - io.setUpdateCounter(pageAddr, updCntr); - io.setGlobalRemoveId(pageAddr, rmvId); - io.setSize(pageAddr, size); + changed |= io.setUpdateCounter(pageAddr, updCntr); + changed |= io.setGlobalRemoveId(pageAddr, rmvId); + changed |= io.setSize(pageAddr, size); - io.setPartitionState(pageAddr, (byte)state); + changed |= io.setPartitionState(pageAddr, (byte)state); long cntrsPageId; @@ -237,7 +244,10 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple if (init) { cntrsPageId = pageMem.allocatePage(grpId, store.partId(), PageIdAllocator.FLAG_DATA); + io.setCountersPageId(pageAddr, cntrsPageId); + + changed = true; } long nextId = cntrsPageId; @@ -256,6 +266,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple if (init) { partMetaIo = PagePartitionCountersIO.VERSIONS.latest(); + partMetaIo.initNewPage(curAddr, curId, pageSize); } else @@ -299,6 +310,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple try { long nextSnapshotTag = io.getNextSnapshotTag(metaPageAddr); + io.setNextSnapshotTag(metaPageAddr, nextSnapshotTag + 1); if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, metaPageId, @@ -326,6 +338,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple partMap.get(store.partId()) == GridDhtPartitionState.OWNING) addPartition(ctx.partitionStatMap(), pageAddr, io, grpId, store.partId(), this.ctx.pageStore().pages(grpId, store.partId())); + + changed = true; } else pageCnt = io.getCandidatePageCount(pageAddr); @@ -343,7 +357,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple )); } finally { - pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, true); + pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, changed); } } finally { http://git-wip-us.apache.org/repos/asf/ignite/blob/51823847/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryImpl.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryImpl.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryImpl.java index 93ee411..958e0ea 100755 --- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryImpl.java +++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryImpl.java @@ -1187,7 +1187,19 @@ public class PageMemoryImpl implements PageMemoryEx { long pageId = PageIO.getPageId(page + PAGE_OVERHEAD); - rwLock.writeUnlock(page + PAGE_LOCK_OFFSET, PageIdUtils.tag(pageId)); + try { + rwLock.writeUnlock(page + PAGE_LOCK_OFFSET, PageIdUtils.tag(pageId)); + } + catch (AssertionError ex) { + StringBuilder sb = new StringBuilder(sysPageSize * 2); + + for (int i = 0; i < systemPageSize(); i += 8) + sb.append(U.hexLong(GridUnsafe.getLong(page + i))); + + U.error(log, "Failed to unlock page [fullPageId=" + fullId + ", binPage=" + sb + ']'); + + throw ex; + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/51823847/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWALPointer.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWALPointer.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWALPointer.java index 36df2e7..8884fb3 100644 --- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWALPointer.java +++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWALPointer.java @@ -33,14 +33,29 @@ public class FileWALPointer implements WALPointer, Comparable { /** Written record length */ private int len; + /** Force flush flag. Used in BACKGROUND WAL mode. */ + private boolean forceFlush; + /** * @param idx File timestamp index. * @param fileOffset Offset in file, from the beginning. + * @param len Record length. */ public FileWALPointer(long idx, int fileOffset, int len) { + this(idx, fileOffset, len, false); + } + + /** + * @param idx File timestamp index. + * @param fileOffset Offset in file, from the beginning. + * @param len Record length. + * @param forceFlush Force flush flag. + */ + public FileWALPointer(long idx, int fileOffset, int len, boolean forceFlush) { this.idx = idx; this.fileOffset = fileOffset; this.len = len; + this.forceFlush = forceFlush; } /** @@ -81,6 +96,13 @@ public class FileWALPointer implements WALPointer, Comparable { return new FileWALPointer(idx, fileOffset + len, 0); } + /** + * @return Force flush flag. + */ + public boolean forceFlush() { + return forceFlush; + } + /** {@inheritDoc} */ @Override public boolean equals(Object o) { if (this == o) http://git-wip-us.apache.org/repos/asf/ignite/blob/51823847/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java index 3113c3e..292118c 100644 --- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java +++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java @@ -52,6 +52,7 @@ import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.pagemem.wal.StorageException; import org.apache.ignite.internal.pagemem.wal.WALIterator; import org.apache.ignite.internal.pagemem.wal.WALPointer; +import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; @@ -209,9 +210,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** {@inheritDoc} */ @Override public void start0() throws IgniteCheckedException { - String consId = consistentId(); - if (!cctx.kernalContext().clientNode()) { + String consId = consistentId(); + A.notNullOrEmpty(consId, "consistentId"); consId = U.maskForFileName(consId); @@ -391,7 +392,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** {@inheritDoc} */ @Override public void fsync(WALPointer ptr) throws IgniteCheckedException, StorageException { - if (serializer == null || mode == Mode.NONE || mode == Mode.BACKGROUND) + if (serializer == null || mode == Mode.NONE) return; FileWriteHandle cur = currentHandle(); @@ -402,7 +403,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl FileWALPointer filePtr = (FileWALPointer)(ptr == null ? lastWALPtr.get() : ptr); - if (mode == Mode.LOG_ONLY) { + boolean forceFlush = filePtr != null && filePtr.forceFlush(); + + if (mode == Mode.BACKGROUND && !forceFlush) + return; + + if (mode == Mode.LOG_ONLY || forceFlush) { cur.flushOrWait(filePtr); return; @@ -1575,7 +1581,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl rec.chainSize(newChainSize); rec.previous(h); - FileWALPointer ptr = new FileWALPointer(idx, (int)nextPos, rec.size()); + FileWALPointer ptr = new FileWALPointer( + idx, + (int)nextPos, + rec.size(), + // We need to force checkpoint records into file in BACKGROUND mode. + mode == Mode.BACKGROUND && rec instanceof CheckpointRecord); rec.position(ptr); http://git-wip-us.apache.org/repos/asf/ignite/blob/51823847/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java index f39cdfd..ce66b97 100644 --- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java +++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java @@ -1452,7 +1452,7 @@ public class RecordV1Serializer implements RecordSerializer { CacheState state = entry.getValue(); // 2 bytes partition ID, size and counter per partition. - size += 18 * state.partitions().size(); + size += 18 * state.size(); } return size; @@ -1495,13 +1495,13 @@ public class RecordV1Serializer implements RecordSerializer { CacheState state = entry.getValue(); // Need 2 bytes for the number of partitions. - buf.putShort((short)state.partitions().size()); + buf.putShort((short)state.size()); - for (Map.Entry partEntry : state.partitions().entrySet()) { - buf.putShort((short)(int)partEntry.getKey()); + for (int i = 0; i < state.size(); i++) { + buf.putShort((short)state.partitionByIndex(i)); - buf.putLong(partEntry.getValue().size()); - buf.putLong(partEntry.getValue().partitionCounter()); + buf.putLong(state.partitionSizeByIndex(i)); + buf.putLong(state.partitionCounterByIndex(i)); } } } @@ -1593,7 +1593,7 @@ public class RecordV1Serializer implements RecordSerializer { int parts = buf.readShort() & 0xFFFF; - CacheState state = new CacheState(); + CacheState state = new CacheState(parts); for (int p = 0; p < parts; p++) { int partId = buf.readShort() & 0xFFFF; http://git-wip-us.apache.org/repos/asf/ignite/blob/51823847/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalHistoryReservationsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalHistoryReservationsSelfTest.java b/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalHistoryReservationsSelfTest.java index 77715c3..aec3874 100644 --- a/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalHistoryReservationsSelfTest.java +++ b/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalHistoryReservationsSelfTest.java @@ -124,7 +124,6 @@ public class IgniteWalHistoryReservationsSelfTest extends GridCommonAbstractTest forceCheckpoint(); - Lock lock = cache.lock(0); lock.lock(); http://git-wip-us.apache.org/repos/asf/ignite/blob/51823847/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java index 602b227..d3b860c 100644 --- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java @@ -179,6 +179,14 @@ public class IgniteBenchmarkArguments { private boolean keysPerThread; /** */ + @Parameter(names = {"-pc", "--partitionedCachesNumber"}, description = "Number of partitioned caches") + private int partitionedCachesNumber = 1; + + /** */ + @Parameter(names = {"-rc", "--replicatedCachesNumber"}, description = "Number of replicated caches") + private int replicatedCachesNumber = 1; + + /** */ @Parameter(names = {"-ac", "--additionalCachesNumber"}, description = "Number of additional caches") private int additionalCachesNum; @@ -199,6 +207,10 @@ public class IgniteBenchmarkArguments { private int pageSize = MemoryConfiguration.DFLT_PAGE_SIZE; /** */ + @Parameter(names = {"-prt", "--partitions"}, description = "Number of cache partitions") + private int partitions = 10; + + /** */ @Parameter(names = {"-cg", "--cacheGrp"}, description = "Cache group for caches") private String cacheGrp; @@ -477,6 +489,27 @@ public class IgniteBenchmarkArguments { } /** + * @return Number of partitioned caches. + */ + public int partitionedCachesNumber() { + return partitionedCachesNumber; + } + + /** + * @return Number of replicated caches. + */ + public int replicatedCachesNumber() { + return replicatedCachesNumber; + } + + /** + * @return Number of cache partitions. + */ + public int partitions() { + return partitions; + } + + /** * @return Number of additional caches. */ public int additionalCachesNumber() {