From commits-return-125533-archive-asf-public=cust-asf.ponee.io@ignite.apache.org Wed Dec 4 15:32:56 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 14D48180656 for ; Wed, 4 Dec 2019 16:32:54 +0100 (CET) Received: (qmail 82322 invoked by uid 500); 4 Dec 2019 15:32:54 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 82308 invoked by uid 99); 4 Dec 2019 15:32:54 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Dec 2019 15:32:54 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id D6AD08D809; Wed, 4 Dec 2019 15:32:53 +0000 (UTC) Date: Wed, 04 Dec 2019 15:32:53 +0000 To: "commits@ignite.apache.org" Subject: [ignite] branch master updated: Revert "IGNITE-11704 Write tombstones during rebalance to get rid of deferred delete buffer" (#7100) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <157547357284.19856.266310931951424674@gitbox.apache.org> From: mmuzaf@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: ignite X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 6d72874067bf3731817bba2213d9d20f3c06577c X-Git-Newrev: 9265c04a368c4cf0fc331aac5a71f7d0db365ea8 X-Git-Rev: 9265c04a368c4cf0fc331aac5a71f7d0db365ea8 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. mmuzaf pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git The following commit(s) were added to refs/heads/master by this push: new 9265c04 Revert "IGNITE-11704 Write tombstones during rebalance to get rid of deferred delete buffer" (#7100) 9265c04 is described below commit 9265c04a368c4cf0fc331aac5a71f7d0db365ea8 Author: Maxim Muzafarov AuthorDate: Wed Dec 4 18:32:34 2019 +0300 Revert "IGNITE-11704 Write tombstones during rebalance to get rid of deferred delete buffer" (#7100) This reverts commit ce9f593495a6c9c89311aa1608ffda7fe92d0aa0. --- .../communication/GridIoMessageFactory.java | 6 - .../apache/ignite/internal/pagemem/PageUtils.java | 4 +- .../internal/pagemem/wal/record/WALRecord.java | 5 +- .../delta/MetaPageUpdatePartitionDataRecord.java | 19 +- .../delta/MetaPageUpdatePartitionDataRecordV2.java | 22 +- .../delta/MetaPageUpdatePartitionDataRecordV3.java | 108 ------ .../processors/cache/CacheGroupContext.java | 19 - .../processors/cache/CacheGroupMetricsImpl.java | 23 +- .../internal/processors/cache/CacheObject.java | 3 - .../processors/cache/GridCacheContext.java | 9 +- .../processors/cache/GridCacheMapEntry.java | 319 +++++++--------- .../cache/IgniteCacheOffheapManager.java | 93 +---- .../cache/IgniteCacheOffheapManagerImpl.java | 395 ++++---------------- .../processors/cache/IncompleteCacheObject.java | 19 - .../processors/cache/IncompleteObject.java | 2 +- .../processors/cache/PartitionUpdateCounter.java | 4 +- .../processors/cache/TombstoneCacheObject.java | 94 ----- .../binary/CacheObjectBinaryProcessorImpl.java | 6 +- .../dht/topology/GridDhtLocalPartition.java | 293 +++++---------- .../dht/topology/GridDhtPartitionTopologyImpl.java | 5 +- .../dht/topology/PartitionsEvictManager.java | 366 ++++-------------- .../processors/cache/persistence/CacheDataRow.java | 5 - .../cache/persistence/CacheDataRowAdapter.java | 61 +-- .../cache/persistence/GridCacheOffheapManager.java | 369 +++++++----------- .../IgniteCacheDatabaseSharedManager.java | 104 +----- .../persistence/tree/io/PagePartitionMetaIO.java | 29 +- .../persistence/tree/io/PagePartitionMetaIOV2.java | 25 +- .../wal/serializer/RecordDataV1Serializer.java | 11 - .../internal/processors/cache/tree/DataRow.java | 6 +- .../processors/metric/impl/MetricUtils.java | 9 - .../cache/CacheDeferredDeleteSanitySelfTest.java | 6 +- .../cache/GridCacheAbstractFullApiSelfTest.java | 7 +- .../IgniteCacheConfigVariationsFullApiTest.java | 2 +- .../CacheRemoveWithTombstonesLoadTest.java | 414 --------------------- .../distributed/CacheRemoveWithTombstonesTest.java | 289 -------------- .../CacheRemoveWithTombstonesFailoverTest.java | 187 ---------- .../DropCacheContextDuringEvictionTest.java | 24 +- .../PartitionsEvictManagerAbstractTest.java | 113 +++--- .../PartitionsEvictionTaskFailureHandlerTest.java | 72 +--- .../processors/database/CacheFreeListSelfTest.java | 5 - .../testsuites/IgniteCacheMvccTestSuite9.java | 18 +- .../ignite/testsuites/IgniteCacheTestSuite9.java | 7 - .../query/h2/database/H2PkHashIndex.java | 4 +- .../processors/query/h2/opt/H2CacheRow.java | 5 - 44 files changed, 669 insertions(+), 2917 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index b5ae4f6..d8d62d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -53,7 +53,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccEntryInfo; import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.GridChangeGlobalStateMessageResponse; import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl; -import org.apache.ignite.internal.processors.cache.TombstoneCacheObject; import org.apache.ignite.internal.processors.cache.WalStateAckMessage; import org.apache.ignite.internal.processors.cache.binary.MetadataRequestMessage; import org.apache.ignite.internal.processors.cache.binary.MetadataResponseMessage; @@ -1167,11 +1166,6 @@ public class GridIoMessageFactory implements MessageFactory { break; - case 176: - msg = TombstoneCacheObject.INSTANCE; - - break; - // [-3..119] [124..129] [-23..-28] [-36..-55] - this // [120..123] - DR // [-4..-22, -30..-35] - SQL diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageUtils.java index 0b9b1b4..217164c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageUtils.java @@ -56,8 +56,8 @@ public class PageUtils { */ public static byte[] getBytes(long addr, int off, int len) { assert addr > 0 : addr; - assert off >= 0 : off; - assert len >= 0 : len; + assert off >= 0; + assert len >= 0; byte[] bytes = new byte[len]; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java index 9dca94d..ea884db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java @@ -221,10 +221,7 @@ public abstract class WALRecord { ROLLBACK_TX_RECORD (57, LOGICAL), /** Partition meta page containing update counter gaps. */ - PARTITION_META_PAGE_UPDATE_COUNTERS_V2 (58, PHYSICAL), - - /** Partition meta page containing tombstone presence flag. */ - PARTITION_META_PAGE_UPDATE_COUNTERS_V3 (60, PHYSICAL); + PARTITION_META_PAGE_UPDATE_COUNTERS_V2 (58, PHYSICAL); /** Index for serialization. Should be consistent throughout all versions. */ private final int idx; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java index 0c9f7fc..3e2b67b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java @@ -46,7 +46,7 @@ public class MetaPageUpdatePartitionDataRecord extends PageDeltaRecord { private int allocatedIdxCandidate; /** */ - private long cacheSizesPageId; + private long cntrsPageId; /** * @param grpId Cache group ID. @@ -59,10 +59,9 @@ public class MetaPageUpdatePartitionDataRecord extends PageDeltaRecord { long updateCntr, long globalRmvId, int partSize, - long cacheSizesPageId, + long cntrsPageId, byte state, - int allocatedIdxCandidate - ) { + int allocatedIdxCandidate) { super(grpId, pageId); this.updateCntr = updateCntr; @@ -70,7 +69,7 @@ public class MetaPageUpdatePartitionDataRecord extends PageDeltaRecord { this.partSize = partSize; this.state = state; this.allocatedIdxCandidate = allocatedIdxCandidate; - this.cacheSizesPageId = cacheSizesPageId; + this.cntrsPageId = cntrsPageId; } /** @@ -82,7 +81,7 @@ public class MetaPageUpdatePartitionDataRecord extends PageDeltaRecord { this.updateCntr = in.readLong(); this.globalRmvId = in.readLong(); this.partSize = in.readInt(); - this.cacheSizesPageId = in.readLong(); + this.cntrsPageId = in.readLong(); this.state = in.readByte(); this.allocatedIdxCandidate = in.readInt(); } @@ -111,8 +110,8 @@ public class MetaPageUpdatePartitionDataRecord extends PageDeltaRecord { /** * @return Partition size. */ - public long cacheSizesPageId() { - return cacheSizesPageId; + public long countersPageId() { + return cntrsPageId; } /** @@ -129,7 +128,7 @@ public class MetaPageUpdatePartitionDataRecord extends PageDeltaRecord { io.setUpdateCounter(pageAddr, updateCntr); io.setGlobalRemoveId(pageAddr, globalRmvId); io.setSize(pageAddr, partSize); - io.setSizesPageId(pageAddr, cacheSizesPageId); + io.setCountersPageId(pageAddr, cntrsPageId); io.setPartitionState(pageAddr, state); io.setCandidatePageCount(pageAddr, allocatedIdxCandidate); } @@ -151,7 +150,7 @@ public class MetaPageUpdatePartitionDataRecord extends PageDeltaRecord { buf.putLong(updateCounter()); buf.putLong(globalRemoveId()); buf.putInt(partitionSize()); - buf.putLong(cacheSizesPageId()); + buf.putLong(countersPageId()); buf.put(state()); buf.putInt(allocatedIndexCandidate()); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecordV2.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecordV2.java index a8a8597..ab3ccf8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecordV2.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecordV2.java @@ -28,12 +28,11 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageParti import org.apache.ignite.internal.util.typedef.internal.S; /** - * Partition meta page delta record. - * Contains reference to update counters gaps. + * */ public class MetaPageUpdatePartitionDataRecordV2 extends MetaPageUpdatePartitionDataRecord { /** */ - private long gapsLink; + private long link; /** * @param grpId Group id. @@ -44,7 +43,7 @@ public class MetaPageUpdatePartitionDataRecordV2 extends MetaPageUpdatePartition * @param cntrsPageId Cntrs page id. * @param state State. * @param allocatedIdxCandidate Allocated index candidate. - * @param gapsLink Link. + * @param link Link. */ public MetaPageUpdatePartitionDataRecordV2( int grpId, @@ -55,10 +54,9 @@ public class MetaPageUpdatePartitionDataRecordV2 extends MetaPageUpdatePartition long cntrsPageId, byte state, int allocatedIdxCandidate, - long gapsLink - ) { + long link) { super(grpId, pageId, updateCntr, globalRmvId, partSize, cntrsPageId, state, allocatedIdxCandidate); - this.gapsLink = gapsLink; + this.link = link; } /** @@ -67,7 +65,7 @@ public class MetaPageUpdatePartitionDataRecordV2 extends MetaPageUpdatePartition public MetaPageUpdatePartitionDataRecordV2(DataInput in) throws IOException { super(in); - this.gapsLink = in.readLong(); + this.link = in.readLong(); } /** {@inheritDoc} */ @@ -76,21 +74,21 @@ public class MetaPageUpdatePartitionDataRecordV2 extends MetaPageUpdatePartition PagePartitionMetaIOV2 io = (PagePartitionMetaIOV2)PagePartitionMetaIO.VERSIONS.forPage(pageAddr); - io.setGapsLink(pageAddr, gapsLink); + io.setGapsLink(pageAddr, link); } /** * */ - public long gapsLink() { - return gapsLink; + public long link() { + return link; } /** {@inheritDoc} */ @Override public void toBytes(ByteBuffer buf) { super.toBytes(buf); - buf.putLong(gapsLink()); + buf.putLong(link()); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecordV3.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecordV3.java deleted file mode 100644 index 1263c43..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecordV3.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.pagemem.wal.record.delta; - -import java.io.DataInput; -import java.io.IOException; -import java.nio.ByteBuffer; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.pagemem.PageIdUtils; -import org.apache.ignite.internal.pagemem.PageMemory; -import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO; -import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIOV2; -import org.apache.ignite.internal.util.typedef.internal.S; - -/** - * Partition meta page delta record. - * Contains information about tombstones count. - */ -public class MetaPageUpdatePartitionDataRecordV3 extends MetaPageUpdatePartitionDataRecordV2 { - /** Tombstones count. */ - private long tombstonesCnt; - - /** - * @param grpId Group id. - * @param pageId Page id. - * @param updateCntr Update counter. - * @param globalRmvId Global remove id. - * @param partSize Partition size. - * @param cacheSizesPageId Cache sizes page id. - * @param state State. - * @param allocatedIdxCandidate Allocated index candidate. - * @param gapsLink Gaps link. - * @param tombstonesCnt Tombstones count. - */ - public MetaPageUpdatePartitionDataRecordV3( - int grpId, - long pageId, - long updateCntr, - long globalRmvId, - int partSize, - long cacheSizesPageId, - byte state, - int allocatedIdxCandidate, - long gapsLink, - long tombstonesCnt - ) { - super(grpId, pageId, updateCntr, globalRmvId, partSize, cacheSizesPageId, state, allocatedIdxCandidate, gapsLink); - this.tombstonesCnt = tombstonesCnt; - } - - /** - * @param in In. - */ - public MetaPageUpdatePartitionDataRecordV3(DataInput in) throws IOException { - super(in); - - this.tombstonesCnt = in.readLong(); - } - - /** - * @return Tombstones count. - */ - public long tombstonesCount() { - return tombstonesCnt; - } - - /** {@inheritDoc} */ - @Override public void applyDelta(PageMemory pageMem, long pageAddr) throws IgniteCheckedException { - super.applyDelta(pageMem, pageAddr); - - PagePartitionMetaIOV2 io = (PagePartitionMetaIOV2) PagePartitionMetaIO.VERSIONS.forPage(pageAddr); - - io.setTombstonesCount(pageAddr, tombstonesCnt); - } - - /** {@inheritDoc} */ - @Override public void toBytes(ByteBuffer buf) { - super.toBytes(buf); - - buf.putLong(tombstonesCnt); - } - - /** {@inheritDoc} */ - @Override public RecordType type() { - return RecordType.PARTITION_META_PAGE_UPDATE_COUNTERS_V3; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(MetaPageUpdatePartitionDataRecordV2.class, this, "partId", PageIdUtils.partId(pageId()), - "super", super.toString()); - } -} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java index e753edb..ad35570 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java @@ -46,8 +46,6 @@ import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCach import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; -import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; -import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopologyImpl; import org.apache.ignite.internal.processors.cache.persistence.DataRegion; @@ -263,8 +261,6 @@ public class CacheGroupContext { statHolderIdx = new IoStatisticsHolderIndex(HASH_INDEX, cacheOrGroupName(), HASH_PK_IDX_NAME, mmgr); statHolderData = new IoStatisticsHolderCache(cacheOrGroupName(), grpId, mmgr); } - - hasAtomicCaches = ccfg.getAtomicityMode() == ATOMIC; } /** @@ -1303,21 +1299,6 @@ public class CacheGroupContext { } /** - * @return {@code True} if need create temporary tombstones entries for removed data. - */ - public boolean supportsTombstone() { - return !mvccEnabled && !isLocal(); - } - - /** - * @param part Partition. - * @return {@code True} if need create tombstone for remove in given partition. - */ - public boolean shouldCreateTombstone(@Nullable GridDhtLocalPartition part) { - return part != null && supportsTombstone() && part.state() == GridDhtPartitionState.MOVING; - } - - /** * @return Metrics. */ public CacheGroupMetricsImpl metrics() { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsImpl.java index fab2e1f..e82e451 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsImpl.java @@ -70,7 +70,6 @@ public class CacheGroupMetricsImpl { private final LongMetric sparseStorageSize; /** Interface describing a predicate of two integers. */ - @FunctionalInterface private interface IntBiPredicate { /** * Predicate body. @@ -170,10 +169,6 @@ public class CacheGroupMetricsImpl { mreg.register("TotalAllocatedSize", this::getTotalAllocatedSize, "Total size of memory allocated for group, in bytes."); - - mreg.register("Tombstones", - this::getTombstones, - "Number of tombstone entries."); } /** */ @@ -258,12 +253,20 @@ public class CacheGroupMetricsImpl { /** */ public int getMinimumNumberOfPartitionCopies() { - return numberOfPartitionCopies((targetVal, nextVal) -> nextVal < targetVal); + return numberOfPartitionCopies(new IntBiPredicate() { + @Override public boolean apply(int targetVal, int nextVal) { + return nextVal < targetVal; + } + }); } /** */ public int getMaximumNumberOfPartitionCopies() { - return numberOfPartitionCopies((targetVal, nextVal) -> nextVal > targetVal); + return numberOfPartitionCopies(new IntBiPredicate() { + @Override public boolean apply(int targetVal, int nextVal) { + return nextVal > targetVal; + } + }); } /** @@ -459,12 +462,6 @@ public class CacheGroupMetricsImpl { return sparseStorageSize == null ? 0 : sparseStorageSize.value(); } - /** */ - public long getTombstones() { - return ctx.topology().localPartitions().stream() - .map(part -> part.dataStore().tombstonesCount()).reduce(Long::sum).orElse(0L); - } - /** Removes all metric for cache group. */ public void remove() { ctx.shared().kernalContext().metric().remove(metricGroupName()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java index 5e89926..f9f384a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java @@ -38,9 +38,6 @@ public interface CacheObject extends Message { /** */ public static final byte TYPE_BINARY_ENUM = 101; - /** */ - public static final byte TOMBSTONE = -1; - /** * @param ctx Context. * @param cpy If {@code true} need to copy value. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index a97d3ee..9ddafb2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -625,13 +625,8 @@ public class GridCacheContext implements Externalizable { public void cache(GridCacheAdapter cache) { this.cache = cache; - if (grp.supportsTombstone() && cache.configuration().getAtomicityMode() == TRANSACTIONAL - && !store().configured()) - deferredDel = false; - else { - deferredDel = (cache.isDht() || cache.isDhtAtomic() || cache.isColocated() || - (cache.isNear() && cache.configuration().getAtomicityMode() == ATOMIC)); - } + deferredDel = cache.isDht() || cache.isDhtAtomic() || cache.isColocated() || + (cache.isNear() && cache.configuration().getAtomicityMode() == ATOMIC); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 641001f..a064503 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -98,9 +98,9 @@ import org.apache.ignite.internal.util.typedef.T3; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.NotNull; @@ -1713,19 +1713,14 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme interceptRes = cctx.config().getInterceptor().onBeforeRemove(entry0); - if (cctx.cancelRemove(interceptRes)) + if (cctx.cancelRemove(interceptRes)) { + CacheObject ret = cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2())); + return new GridCacheUpdateTxResult(false, logPtr); + } } - if (cctx.group().shouldCreateTombstone(localPartition())) { - cctx.offheap().removeWithTombstone(cctx, key, newVer, localPartition()); - - // Partition may change his state during remove. - if (!cctx.group().shouldCreateTombstone(localPartition())) - removeTombstone0(newVer); - } - else - removeValue(); + removeValue(); update(null, 0, 0, newVer, true); @@ -2823,34 +2818,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } /** - * @param tombstoneVer Tombstone version. - * @throws GridCacheEntryRemovedException If entry was removed. - * @throws IgniteCheckedException If failed. - */ - public void removeTombstone(GridCacheVersion tombstoneVer) throws GridCacheEntryRemovedException, IgniteCheckedException { - lockEntry(); - - try { - checkObsolete(); - - removeTombstone0(tombstoneVer); - } - finally { - unlockEntry(); - } - } - - /** - * @param tombstoneVer Tombstone version. - * @throws IgniteCheckedException If failed. - */ - private void removeTombstone0(GridCacheVersion tombstoneVer) throws IgniteCheckedException { - RemoveClosure c = new RemoveClosure(this, tombstoneVer); - - cctx.offheap().invoke(cctx, key, localPartition(), c); - } - - /** * @return {@code True} if this entry should not be evicted from cache. */ protected boolean evictionDisabled() { @@ -3370,18 +3337,20 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme boolean update; - IgniteBiPredicate p = new IgniteBiPredicate() { - @Override public boolean apply(@Nullable CacheObject val, GridCacheVersion currVer) { + IgnitePredicate p = new IgnitePredicate() { + @Override public boolean apply(@Nullable CacheDataRow row) { boolean update0; - boolean isStartVer = cctx.shared().versions().isStartVersion(currVer); + GridCacheVersion currentVer = row != null ? row.version() : GridCacheMapEntry.this.ver; + + boolean isStartVer = cctx.shared().versions().isStartVersion(currentVer); if (cctx.group().persistenceEnabled()) { if (!isStartVer) { if (cctx.atomic()) - update0 = ATOMIC_VER_COMPARATOR.compare(currVer, ver) < 0; + update0 = ATOMIC_VER_COMPARATOR.compare(currentVer, ver) < 0; else - update0 = currVer.compareTo(ver) < 0; + update0 = currentVer.compareTo(ver) < 0; } else update0 = true; @@ -3389,15 +3358,14 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme else update0 = isStartVer; - // Such combination may exist during datastreamer first update. - update0 |= (!preload && val == null); + update0 |= (!preload && deletedUnlocked()); return update0; } }; if (unswapped) { - update = p.apply(this.val, this.ver); + update = p.apply(null); if (update) { // If entry is already unswapped and we are modifying it, we must run deletion callbacks for old value. @@ -3428,7 +3396,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme // cannot identify whether the entry is exist on the fly unswap(false); - if (update = p.apply(this.val, this.ver)) { + if (update = p.apply(null)) { // If entry is already unswapped and we are modifying it, we must run deletion callbacks for old value. long oldExpTime = expireTimeUnlocked(); long delta = (oldExpTime == 0 ? 0 : oldExpTime - U.currentTimeMillis()); @@ -4288,11 +4256,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme * @param ver New entry version. * @throws IgniteCheckedException If update failed. */ - protected boolean storeValue( - @Nullable CacheObject val, + protected boolean storeValue(@Nullable CacheObject val, long expireTime, - GridCacheVersion ver - ) throws IgniteCheckedException { + GridCacheVersion ver) throws IgniteCheckedException { return storeValue(val, expireTime, ver, null, null); } @@ -4302,26 +4268,26 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme * @param val Value. * @param expireTime Expire time. * @param ver New entry version. - * @param p Optional predicate. + * @param predicate Optional predicate. * @param row Pre-created data row, associated with this cache entry. * @return {@code True} if storage was modified. * @throws IgniteCheckedException If update failed. */ - private boolean storeValue( + protected boolean storeValue( @Nullable CacheObject val, long expireTime, GridCacheVersion ver, - @Nullable IgniteBiPredicate p, + @Nullable IgnitePredicate predicate, @Nullable CacheDataRow row ) throws IgniteCheckedException { assert lock.isHeldByCurrentThread(); assert localPartition() == null || localPartition().state() != RENTING : localPartition(); - UpdateClosure c = new UpdateClosure(this, val, ver, expireTime, p, row); + UpdateClosure closure = new UpdateClosure(this, val, ver, expireTime, predicate, row); - cctx.offheap().invoke(cctx, key, localPartition(), c); + cctx.offheap().invoke(cctx, key, localPartition(), closure); - return c.treeOp != IgniteTree.OperationType.NOOP; + return closure.treeOp != IgniteTree.OperationType.NOOP; } /** @@ -4515,9 +4481,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CacheDataRow row = cctx.offheap().read(this); - if (cctx.offheap().isTombstone(row)) - return; - if (row != null && (filter == null || filter.apply(row))) clo.apply(row); } @@ -5742,101 +5705,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } /** - * @param row Data row. - * @return {@code True} if row expired. - * @throws IgniteCheckedException If failed. - */ - private boolean checkRowExpired(CacheDataRow row) throws IgniteCheckedException { - assert row != null; - - if (!(row.expireTime() > 0 && row.expireTime() <= U.currentTimeMillis())) - return false; - - CacheObject expiredVal = row.value(); - - if (cctx.deferredDelete() && !detached() && !isInternal()) { - update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, ver, true); - - if (!deletedUnlocked()) - deletedUnlocked(true); - } - else - markObsolete0(cctx.versions().next(), true, null); - - if (cctx.events().isRecordable(EVT_CACHE_OBJECT_EXPIRED)) { - cctx.events().addEvent(partition(), - key(), - cctx.localNodeId(), - null, - EVT_CACHE_OBJECT_EXPIRED, - null, - false, - expiredVal, - expiredVal != null, - null, - null, - null, - true); - } - - cctx.continuousQueries().onEntryExpired(this, key(), expiredVal); - - return true; - } - - /** - * - */ - private static class RemoveClosure implements IgniteCacheOffheapManager.OffheapInvokeClosure { - /** */ - private final GridCacheMapEntry entry; - - /** */ - private final GridCacheVersion ver; - - /** */ - private IgniteTree.OperationType op; - - /** */ - private CacheDataRow oldRow; - - public RemoveClosure(GridCacheMapEntry entry, GridCacheVersion ver) { - this.entry = entry; - this.ver = ver; - } - - /** {@inheritDoc} */ - @Override public @Nullable CacheDataRow oldRow() { - return oldRow; - } - - /** {@inheritDoc} */ - @Override public void call(@Nullable CacheDataRow row) throws IgniteCheckedException { - if (row == null || !ver.equals(row.version())) { - op = IgniteTree.OperationType.NOOP; - - return; - } - - row.key(entry.key); - - oldRow = row; - - op = IgniteTree.OperationType.REMOVE; - } - - /** {@inheritDoc} */ - @Override public CacheDataRow newRow() { - return null; - } - - /** {@inheritDoc} */ - @Override public IgniteTree.OperationType operationType() { - return op; - } - } - - /** * */ private static class UpdateClosure implements IgniteCacheOffheapManager.OffheapInvokeClosure { @@ -5853,7 +5721,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme private final long expireTime; /** */ - @Nullable private final IgniteBiPredicate p; + @Nullable private final IgnitePredicate predicate; /** */ private CacheDataRow newRow; @@ -5869,48 +5737,32 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme * @param val New value. * @param ver New version. * @param expireTime New expire time. - * @param p Optional predicate. - * @param newRow New row value. + * @param predicate Optional predicate. */ - private UpdateClosure( - GridCacheMapEntry entry, - @Nullable CacheObject val, - GridCacheVersion ver, - long expireTime, - @Nullable IgniteBiPredicate p, - @Nullable CacheDataRow newRow - ) { + UpdateClosure(GridCacheMapEntry entry, @Nullable CacheObject val, GridCacheVersion ver, long expireTime, + @Nullable IgnitePredicate predicate, @Nullable CacheDataRow newRow) { this.entry = entry; this.val = val; this.ver = ver; this.expireTime = expireTime; - this.p = p; + this.predicate = predicate; this.newRow = newRow; } /** {@inheritDoc} */ @Override public void call(@Nullable CacheDataRow oldRow) throws IgniteCheckedException { - if (oldRow != null) + if (oldRow != null) { oldRow.key(entry.key); - this.oldRow = oldRow; - - if (p != null) { - CacheObject val = null; - GridCacheVersion ver = entry.ver; - - if (oldRow != null) { - if (!entry.checkRowExpired(oldRow) && !entry.context().offheap().isTombstone(oldRow)) - val = oldRow.value(); + oldRow = checkRowExpired(oldRow); + } - ver = oldRow.version(); - } + this.oldRow = oldRow; - if (!p.apply(val, ver)) { - treeOp = IgniteTree.OperationType.NOOP; + if (predicate != null && !predicate.apply(oldRow)) { + treeOp = IgniteTree.OperationType.NOOP; - return; - } + return; } if (val != null) { @@ -5921,8 +5773,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme val, ver, expireTime, - oldRow - ); + oldRow); } treeOp = oldRow != null && oldRow.link() == newRow.link() ? @@ -5946,6 +5797,53 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme @Nullable @Override public CacheDataRow oldRow() { return oldRow; } + + /** + * Checks row for expiration and fire expire events if needed. + * + * @param row old row. + * @return {@code Null} if row was expired, row itself otherwise. + * @throws IgniteCheckedException + */ + private CacheDataRow checkRowExpired(CacheDataRow row) throws IgniteCheckedException { + assert row != null; + + if (!(row.expireTime() > 0 && row.expireTime() <= U.currentTimeMillis())) + return row; + + GridCacheContext cctx = entry.context(); + + CacheObject expiredVal = row.value(); + + if (cctx.deferredDelete() && !entry.detached() && !entry.isInternal()) { + entry.update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, entry.ver, true); + + if (!entry.deletedUnlocked() && !entry.isStartVersion()) + entry.deletedUnlocked(true); + } + else + entry.markObsolete0(cctx.versions().next(), true, null); + + if (cctx.events().isRecordable(EVT_CACHE_OBJECT_EXPIRED)) { + cctx.events().addEvent(entry.partition(), + entry.key(), + cctx.localNodeId(), + null, + EVT_CACHE_OBJECT_EXPIRED, + null, + false, + expiredVal, + expiredVal != null, + null, + null, + null, + true); + } + + cctx.continuousQueries().onEntryExpired(entry, entry.key(), expiredVal); + + return null; + } } /** @@ -6120,7 +6018,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme // unswap entry.update(oldRow.value(), oldRow.expireTime(), 0, oldRow.version(), false); - if (entry.checkRowExpired(oldRow)) { + if (checkRowExpired(oldRow)) { oldRowExpiredFlag = true; oldRow = null; @@ -6272,6 +6170,53 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } /** + * Check row expiration and fire expire events if needed. + * + * @param row Old row. + * @return {@code True} if row was expired, {@code False} otherwise. + * @throws IgniteCheckedException if failed. + */ + private boolean checkRowExpired(CacheDataRow row) throws IgniteCheckedException { + assert row != null; + + if (!(row.expireTime() > 0 && row.expireTime() <= U.currentTimeMillis())) + return false; + + GridCacheContext cctx = entry.context(); + + CacheObject expiredVal = row.value(); + + if (cctx.deferredDelete() && !entry.detached() && !entry.isInternal()) { + entry.update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, entry.ver, true); + + if (!entry.deletedUnlocked()) + entry.deletedUnlocked(true); + } + else + entry.markObsolete0(cctx.versions().next(), true, null); + + if (cctx.events().isRecordable(EVT_CACHE_OBJECT_EXPIRED)) { + cctx.events().addEvent(entry.partition(), + entry.key(), + cctx.localNodeId(), + null, + EVT_CACHE_OBJECT_EXPIRED, + null, + false, + expiredVal, + expiredVal != null, + null, + null, + null, + true); + } + + cctx.continuousQueries().onEntryExpired(entry, entry.key(), expiredVal); + + return true; + } + + /** * @param storeLoadedVal Value loaded from store. * @param updateExpireTime {@code True} if need update expire time. * @throws IgniteCheckedException If failed. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index 283aaea..e73ad52 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -30,7 +30,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.topology.Grid import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; -import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter; import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow; import org.apache.ignite.internal.processors.cache.persistence.DataRowCacheAware; import org.apache.ignite.internal.processors.cache.persistence.RootPage; @@ -163,6 +162,11 @@ public interface IgniteCacheOffheapManager { public void destroyCacheDataStore(CacheDataStore store) throws IgniteCheckedException; /** + * TODO: GG-10884, used on only from initialValue. + */ + public boolean containsKey(GridCacheMapEntry entry); + + /** * @param cctx Cache context. * @param c Closure. * @param amount Limit of processed entries by single call, {@code -1} for no limit. @@ -220,7 +224,7 @@ public interface IgniteCacheOffheapManager { * @return Iterator over all versions. * @throws IgniteCheckedException If failed. */ - GridCursor mvccAllVersionsCursor(GridCacheContext cctx, KeyCacheObject key, CacheDataRowAdapter.RowData x) + GridCursor mvccAllVersionsCursor(GridCacheContext cctx, KeyCacheObject key, Object x) throws IgniteCheckedException; /** @@ -401,27 +405,6 @@ public interface IgniteCacheOffheapManager { ) throws IgniteCheckedException; /** - * @param cctx Cache context. - * @param key Key. - * @param ver Version. - * @param part Partition. - * @throws IgniteCheckedException If failed. - */ - public void removeWithTombstone( - GridCacheContext cctx, - KeyCacheObject key, - GridCacheVersion ver, - GridDhtLocalPartition part - ) throws IgniteCheckedException; - - /** - * @param row Data row. - * @return {@code True} if give row is tombstone. - * @throws IgniteCheckedException If failed. - */ - public boolean isTombstone(@Nullable CacheDataRow row) throws IgniteCheckedException; - - /** * @param ldr Class loader. * @return Number of undeployed entries. */ @@ -458,20 +441,10 @@ public interface IgniteCacheOffheapManager { /** * @param part Partition number. - * @param withTombstones {@code True} if should return tombstone entries. * @return Iterator for given partition. * @throws IgniteCheckedException If failed. */ - public GridIterator partitionIterator(final int part, boolean withTombstones) throws IgniteCheckedException; - - /** - * @param part Partition number. - * @return Iterator for given partition that skips tombstones. - * @throws IgniteCheckedException If failed. - */ - public default GridIterator partitionIterator(final int part) throws IgniteCheckedException { - return partitionIterator(part, false); - } + public GridIterator partitionIterator(final int part) throws IgniteCheckedException; /** * @param part Partition number. @@ -759,7 +732,7 @@ public interface IgniteCacheOffheapManager { * * @param cctx Cache context. * @param row Row. - * @throws IgniteCheckedException If failed. + * @throws IgniteCheckedException */ public void updateTxState(GridCacheContext cctx, CacheSearchRow row) throws IgniteCheckedException; @@ -932,7 +905,7 @@ public interface IgniteCacheOffheapManager { * @param ver Version. * @param expireTime Expire time. * @param mvccVer Mvcc version. - * @throws IgniteCheckedException If failed. + * @throws IgniteCheckedException */ void mvccApplyUpdate(GridCacheContext cctx, KeyCacheObject key, @@ -953,20 +926,6 @@ public interface IgniteCacheOffheapManager { /** * @param cctx Cache context. * @param key Key. - * @param ver Version. - * @param part Partition. - * @throws IgniteCheckedException If failed. - */ - public void removeWithTombstone( - GridCacheContext cctx, - KeyCacheObject key, - GridCacheVersion ver, - GridDhtLocalPartition part - ) throws IgniteCheckedException; - - /** - * @param cctx Cache context. - * @param key Key. * @return Data row. * @throws IgniteCheckedException If failed. */ @@ -981,7 +940,7 @@ public interface IgniteCacheOffheapManager { * @return Iterator over all versions. * @throws IgniteCheckedException If failed. */ - GridCursor mvccAllVersionsCursor(GridCacheContext cctx, KeyCacheObject key, CacheDataRowAdapter.RowData x) + GridCursor mvccAllVersionsCursor(GridCacheContext cctx, KeyCacheObject key, Object x) throws IgniteCheckedException; /** @@ -1008,23 +967,14 @@ public interface IgniteCacheOffheapManager { * @return Data cursor. * @throws IgniteCheckedException If failed. */ - public default GridCursor cursor() throws IgniteCheckedException { - return cursor(false); - } - - /** - * @param withTombstones {@code True} if should return tombstone entries. - * @return Data cursor. - * @throws IgniteCheckedException If failed. - */ - public GridCursor cursor(boolean withTombstones) throws IgniteCheckedException; + public GridCursor cursor() throws IgniteCheckedException; /** * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row. * @return Data cursor. * @throws IgniteCheckedException If failed. */ - public GridCursor cursor(CacheDataRowAdapter.RowData x) throws IgniteCheckedException; + public GridCursor cursor(Object x) throws IgniteCheckedException; /** * @param mvccSnapshot MVCC snapshot. @@ -1035,11 +985,10 @@ public interface IgniteCacheOffheapManager { /** * @param cacheId Cache ID. - * @param withTombstones {@code True} if should return tombstone entries. * @return Data cursor. * @throws IgniteCheckedException If failed. */ - public GridCursor cursor(int cacheId, boolean withTombstones) throws IgniteCheckedException; + public GridCursor cursor(int cacheId) throws IgniteCheckedException; /** * @param cacheId Cache ID. @@ -1069,7 +1018,7 @@ public interface IgniteCacheOffheapManager { * @throws IgniteCheckedException If failed. */ public GridCursor cursor(int cacheId, KeyCacheObject lower, - KeyCacheObject upper, CacheDataRowAdapter.RowData x) throws IgniteCheckedException; + KeyCacheObject upper, Object x) throws IgniteCheckedException; /** * @param cacheId Cache ID. @@ -1077,16 +1026,11 @@ public interface IgniteCacheOffheapManager { * @param upper Upper bound. * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row. * @param snapshot Mvcc snapshot. - * @param withTombstones {@code True} if should return tombstone entries. * @return Data cursor. * @throws IgniteCheckedException If failed. */ - public GridCursor cursor(int cacheId, - KeyCacheObject lower, - KeyCacheObject upper, - CacheDataRowAdapter.RowData x, - MvccSnapshot snapshot, - boolean withTombstones) throws IgniteCheckedException; + public GridCursor cursor(int cacheId, KeyCacheObject lower, + KeyCacheObject upper, Object x, MvccSnapshot snapshot) throws IgniteCheckedException; /** * Destroys the tree associated with the store. @@ -1150,10 +1094,5 @@ public interface IgniteCacheOffheapManager { * Partition storage. */ public PartitionMetaStorage partStorage(); - - /** - * @return Number of tombstone entries. - */ - public long tombstonesCount(); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index 18e9647..0df7728 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -142,7 +142,6 @@ import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.state; import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.unexpectedStateException; import static org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager.EMPTY_CURSOR; import static org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO.MVCC_INFO_SIZE; -import static org.apache.ignite.internal.util.IgniteTree.OperationType.IN_PLACE; import static org.apache.ignite.internal.util.IgniteTree.OperationType.NOOP; import static org.apache.ignite.internal.util.IgniteTree.OperationType.PUT; @@ -437,8 +436,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager GridCacheContext cctx, KeyCacheObject key, GridDhtLocalPartition part, - OffheapInvokeClosure c - ) throws IgniteCheckedException { + OffheapInvokeClosure c) + throws IgniteCheckedException { dataStore(part).invoke(cctx, key, c); } @@ -617,28 +616,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** {@inheritDoc} */ - @Override public void removeWithTombstone( - GridCacheContext cctx, - KeyCacheObject key, - GridCacheVersion ver, - GridDhtLocalPartition part - ) throws IgniteCheckedException { - assert part != null; - assert !cctx.isNear(); - assert !cctx.isLocal(); - - dataStore(part).removeWithTombstone(cctx, key, ver, part); - } - - /** {@inheritDoc} */ - @Override public boolean isTombstone(CacheDataRow row) throws IgniteCheckedException { - if (!grp.supportsTombstone()) - return false; - - return grp.shared().database().isTombstone(row); - } - - /** {@inheritDoc} */ @Override @Nullable public CacheDataRow read(GridCacheMapEntry entry) throws IgniteCheckedException { KeyCacheObject key = entry.key(); @@ -685,7 +662,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager /** {@inheritDoc} */ @Override public GridCursor mvccAllVersionsCursor(GridCacheContext cctx, - KeyCacheObject key, CacheDataRowAdapter.RowData x) throws IgniteCheckedException { + KeyCacheObject key, Object x) throws IgniteCheckedException { CacheDataStore dataStore = dataStore(cctx, key); return dataStore != null ? dataStore.mvccAllVersionsCursor(cctx, key, x) : EMPTY_CURSOR; @@ -706,6 +683,18 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** {@inheritDoc} */ + @Override public boolean containsKey(GridCacheMapEntry entry) { + try { + return read(entry) != null; + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to read value", e); + + return false; + } + } + + /** {@inheritDoc} */ @Override public void onPartitionCounterUpdated(int part, long cntr) { // No-op. } @@ -729,8 +718,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager GridCacheVersion obsoleteVer = null; try (GridCloseableIterator it = grp.isLocal() ? - iterator(cctx.cacheId(), cacheDataStores().iterator(), null, null, true) : - evictionSafeIterator(cctx.cacheId(), cacheDataStores().iterator(), true)) { + iterator(cctx.cacheId(), cacheDataStores().iterator(), null, null) : + evictionSafeIterator(cctx.cacheId(), cacheDataStores().iterator())) { while (it.hasNext()) { cctx.shared().database().checkpointReadLock(); @@ -873,7 +862,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager @Nullable MvccSnapshot mvccSnapshot, Boolean dataPageScanEnabled ) { - return iterator(cacheId, cacheData(primary, backups, topVer), mvccSnapshot, dataPageScanEnabled, false); + return iterator(cacheId, cacheData(primary, backups, topVer), mvccSnapshot, dataPageScanEnabled); } /** {@inheritDoc} */ @@ -884,17 +873,17 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager if (data == null) return new GridEmptyCloseableIterator<>(); - return iterator(cacheId, singletonIterator(data), mvccSnapshot, dataPageScanEnabled, false); + return iterator(cacheId, singletonIterator(data), mvccSnapshot, dataPageScanEnabled); } /** {@inheritDoc} */ - @Override public GridIterator partitionIterator(int part, boolean withTombstones) { + @Override public GridIterator partitionIterator(int part) { CacheDataStore data = partitionData(part); if (data == null) return new GridEmptyCloseableIterator<>(); - return iterator(CU.UNDEFINED_CACHE_ID, singletonIterator(data), null, null, withTombstones); + return iterator(CU.UNDEFINED_CACHE_ID, singletonIterator(data), null, null); } /** @@ -903,14 +892,12 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager * @param dataIt Data store iterator. * @param mvccSnapshot Mvcc snapshot. * @param dataPageScanEnabled Flag to enable data page scan. - * @param withTombstones {@code True} if should return tombstone entries. * @return Rows iterator */ private GridCloseableIterator iterator(final int cacheId, final Iterator dataIt, final MvccSnapshot mvccSnapshot, - Boolean dataPageScanEnabled, - boolean withTombstones + Boolean dataPageScanEnabled ) { return new GridCloseableIteratorAdapter() { /** */ @@ -947,7 +934,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager try { if (mvccSnapshot == null) - cur = cacheId == CU.UNDEFINED_CACHE_ID ? ds.cursor(withTombstones) : ds.cursor(cacheId, withTombstones); + cur = cacheId == CU.UNDEFINED_CACHE_ID ? ds.cursor() : ds.cursor(cacheId); else { cur = cacheId == CU.UNDEFINED_CACHE_ID ? ds.cursor(mvccSnapshot) : ds.cursor(cacheId, mvccSnapshot); @@ -979,14 +966,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager /** * @param cacheId Cache ID. * @param dataIt Data store iterator. - * @param withTombstones {@code True} if should return tombstone entries. * @return Rows iterator */ - private GridCloseableIterator evictionSafeIterator( - final int cacheId, - final Iterator dataIt, - boolean withTombstones - ) { + private GridCloseableIterator evictionSafeIterator(final int cacheId, final Iterator dataIt) { return new GridCloseableIteratorAdapter() { /** */ private GridCursor cur; @@ -1017,7 +999,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager if (!reservePartition(ds.partId())) continue; - cur = cacheId == CU.UNDEFINED_CACHE_ID ? ds.cursor(withTombstones) : ds.cursor(cacheId, withTombstones); + cur = cacheId == CU.UNDEFINED_CACHE_ID ? ds.cursor() : ds.cursor(cacheId); } else break; @@ -1476,9 +1458,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager /** */ private final PageHandler mvccApplyChanges = new MvccApplyChangesHandler(); - /** Tombstones counter. */ - private final AtomicLong tombstonesCnt = new AtomicLong(); - /** * @param partId Partition number. * @param rowStore Row store. @@ -1730,23 +1709,13 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager case REMOVE: { CacheDataRow oldRow = c.oldRow(); - finishRemove(cctx, row.key(), oldRow, null); + finishRemove(cctx, row.key(), oldRow); break; } - case IN_PLACE: - assert !isTombstone(c.newRow()); - - if (isTombstone(c.oldRow())) { - tombstoneRemoved(); - - incrementSize(cctx.cacheId()); - } - - break; - case NOOP: + case IN_PLACE: break; default: @@ -1764,10 +1733,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager @Nullable CacheDataRow oldRow) throws IgniteCheckedException { int cacheId = grp.storeCacheIdInDataPage() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; - // Set real stored cacheId to properly calculate row size. - if (oldRow != null) - oldRow.cacheId(cacheId); - DataRow dataRow = makeDataRow(key, val, ver, expireTime, cacheId); if (canUpdateOldRow(cctx, oldRow, dataRow) && rowStore.updateRow(oldRow.link(), dataRow, grp.statisticsHolderData())) @@ -1783,13 +1748,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager assert dataRow.link() != 0 : dataRow; - if (grp.sharedGroup()) { - if (dataRow.cacheId() == CU.UNDEFINED_CACHE_ID) - dataRow.cacheId(cctx.cacheId()); - - if (oldRow != null && oldRow.cacheId() == CU.UNDEFINED_CACHE_ID) - oldRow.cacheId(cctx.cacheId()); - } + if (grp.sharedGroup() && dataRow.cacheId() == CU.UNDEFINED_CACHE_ID) + dataRow.cacheId(cctx.cacheId()); return dataRow; } @@ -2647,12 +2607,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager */ private void finishUpdate(GridCacheContext cctx, CacheDataRow newRow, @Nullable CacheDataRow oldRow) throws IgniteCheckedException { - assert !isTombstone(newRow); - - boolean oldTombstone = isTombstone(oldRow); - boolean oldNull = oldRow == null || oldTombstone; - - if (oldNull) + if (oldRow == null) incrementSize(cctx.cacheId()); KeyCacheObject key = newRow.key(); @@ -2660,9 +2615,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager GridCacheQueryManager qryMgr = cctx.queries(); if (qryMgr.enabled()) - qryMgr.store(newRow, oldNull ? null : oldRow, true); + qryMgr.store(newRow, oldRow, true); - updatePendingEntries(cctx, newRow, oldNull ? null : oldRow); + updatePendingEntries(cctx, newRow, oldRow); if (oldRow != null) { assert oldRow.link() != 0 : oldRow; @@ -2671,10 +2626,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager rowStore.removeRow(oldRow.link(), grp.statisticsHolderData()); } - updateIgfsMetrics(cctx, key, (oldNull ? null : oldRow.value()), newRow.value()); - - if (oldTombstone) - tombstoneRemoved(); + updateIgfsMetrics(cctx, key, (oldRow != null ? oldRow.value() : null), newRow.value()); } /** @@ -2717,97 +2669,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager CacheDataRow oldRow = dataTree.remove(new SearchRow(cacheId, key)); - finishRemove(cctx, key, oldRow, null); - } - finally { - busyLock.leaveBusy(); - } - } - - /** - * - */ - private class RemoveWithTombstone implements IgniteCacheOffheapManager.OffheapInvokeClosure { - /** */ - private final GridCacheContext cctx; - - /** */ - private final KeyCacheObject key; - - /** */ - private final GridCacheVersion ver; - - /** */ - private CacheDataRow oldRow; - - /** */ - private CacheDataRow newRow; - - /** - * @param cctx Context. - * @param key Key. - * @param ver Version. - */ - RemoveWithTombstone(GridCacheContext cctx, KeyCacheObject key, GridCacheVersion ver) { - this.cctx = cctx; - this.key = key; - this.ver = ver; - } - - /** {@inheritDoc} */ - @Override public CacheDataRow oldRow() { - return oldRow; - } - - /** {@inheritDoc} */ - @Override public void call(@Nullable CacheDataRow oldRow) throws IgniteCheckedException { - if (oldRow != null) - oldRow.key(key); - - this.oldRow = oldRow; - - newRow = createRow(cctx, key, TombstoneCacheObject.INSTANCE, ver, 0, oldRow); - } - - /** {@inheritDoc} */ - @Override public CacheDataRow newRow() { - return newRow; - } - - /** {@inheritDoc} */ - @Override public IgniteTree.OperationType operationType() { - if (oldRow != null && oldRow.link() == newRow.link()) - return IgniteTree.OperationType.IN_PLACE; - - return PUT; - } - } - - /** {@inheritDoc} */ - @Override public void removeWithTombstone( - GridCacheContext cctx, - KeyCacheObject key, - GridCacheVersion ver, - GridDhtLocalPartition part - ) throws IgniteCheckedException { - if (!busyLock.enterBusy()) - throw new NodeStoppingException("Operation has been cancelled (node is stopping)."); - - try { - assert cctx.shared().database().checkpointLockIsHeldByThread(); - - int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; - - RemoveWithTombstone c = new RemoveWithTombstone(cctx, key, ver); - - dataTree.invoke(new SearchRow(cacheId, key), CacheDataRowAdapter.RowData.NO_KEY, c); - - assert c.operationType() == PUT || c.operationType() == IN_PLACE : c.operationType(); - - if (!isTombstone(c.oldRow)) - tombstoneCreated(); - - finishRemove(cctx, key, c.oldRow, c.newRow); + finishRemove(cctx, key, oldRow); } finally { busyLock.leaveBusy(); @@ -2818,19 +2680,10 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager * @param cctx Cache context. * @param key Key. * @param oldRow Removed row. - * @param tombstoneRow Tombstone row (if tombstone was created for remove). * @throws IgniteCheckedException If failed. */ - private void finishRemove( - GridCacheContext cctx, - KeyCacheObject key, - @Nullable CacheDataRow oldRow, - @Nullable CacheDataRow tombstoneRow - ) throws IgniteCheckedException { - boolean oldTombstone = isTombstone(oldRow); - boolean oldNull = oldRow == null || oldTombstone; - - if (!oldNull) { + private void finishRemove(GridCacheContext cctx, KeyCacheObject key, @Nullable CacheDataRow oldRow) throws IgniteCheckedException { + if (oldRow != null) { clearPendingEntries(cctx, oldRow); decrementSize(cctx.cacheId()); @@ -2839,15 +2692,12 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager GridCacheQueryManager qryMgr = cctx.queries(); if (qryMgr.enabled()) - qryMgr.remove(key, oldNull ? null : oldRow); + qryMgr.remove(key, oldRow); - if (oldRow != null && (tombstoneRow == null || tombstoneRow.link() != oldRow.link())) + if (oldRow != null) rowStore.removeRow(oldRow.link(), grp.statisticsHolderData()); - updateIgfsMetrics(cctx, key, (oldNull ? null : oldRow.value()), null); - - if (oldTombstone && tombstoneRow == null) - tombstoneRemoved(); + updateIgfsMetrics(cctx, key, (oldRow != null ? oldRow.value() : null), null); } /** @@ -2867,15 +2717,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager pendingTree().removex(new PendingRow(cacheId, oldRow.expireTime(), oldRow.link())); } - /** - * @param row Data row. - * @return {@code Null} if given row is tombstone, otherwise row itself. - * @throws IgniteCheckedException If null. - */ - @Nullable private CacheDataRow checkTombstone(@Nullable CacheDataRow row) throws IgniteCheckedException { - return grp.offheap().isTombstone(row) ? null : row; - } - /** {@inheritDoc} */ @Override public CacheDataRow find(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException { key.valueBytes(cctx.cacheObjectContext()); @@ -2895,12 +2736,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager row = clo.row(); } - else { + else row = dataTree.findOne(new SearchRow(cacheId, key), CacheDataRowAdapter.RowData.NO_KEY); - row = checkTombstone(row); - } - afterRowFound(row, key); return row; @@ -2948,7 +2786,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** {@inheritDoc} */ - @Override public GridCursor mvccAllVersionsCursor(GridCacheContext cctx, KeyCacheObject key, CacheDataRowAdapter.RowData x) + @Override public GridCursor mvccAllVersionsCursor(GridCacheContext cctx, KeyCacheObject key, Object x) throws IgniteCheckedException { int cacheId = cctx.cacheId(); @@ -2997,91 +2835,19 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** {@inheritDoc} */ - @Override public GridCursor cursor(boolean withTombstones) throws IgniteCheckedException { - GridCursor cur = dataTree.find(null, null); - - return withTombstones ? cur : cursorSkipTombstone(cur); - } - - /** - * @param cur Cursor. - * @return Cursor skipping non-tombstone entries. - */ - private GridCursor cursorSkipEmpty(final GridCursor cur) { - if (!grp.supportsTombstone()) - return cur; - - return new GridCursor() { - /** */ - CacheDataRow next; - - /** {@inheritDoc} */ - @Override public boolean next() throws IgniteCheckedException { - while (cur.next()) { - CacheDataRow next = cur.get(); - - // If request cursor with RowData.TOMBSTONES, then for non-tombtones all fields are null. - if (next.version() != null) { - this.next = next; - - return true; - } - } - - return false; - } - - /** {@inheritDoc} */ - @Override public CacheDataRow get() { - return next; - } - }; - } - - /** - * @param cur Cursor. - * @return Cursor skipping tombstone entries. - */ - private GridCursor cursorSkipTombstone(final GridCursor cur) { - if (!grp.supportsTombstone()) - return cur; - - return new GridCursor() { - /** */ - CacheDataRow next; - - /** {@inheritDoc} */ - @Override public boolean next() throws IgniteCheckedException { - while (cur.next()) { - CacheDataRow next = cur.get(); - - if (!isTombstone(next)) { - this.next = next; - - return true; - } - } - - return false; - } - - /** {@inheritDoc} */ - @Override public CacheDataRow get() { - return next; - } - }; + @Override public GridCursor cursor() throws IgniteCheckedException { + return dataTree.find(null, null); } /** {@inheritDoc} */ - @Override public GridCursor cursor(CacheDataRowAdapter.RowData x) throws IgniteCheckedException { - GridCursor cur = dataTree.find(null, null, x); - - return x == CacheDataRowAdapter.RowData.TOMBSTONES ? cursorSkipEmpty(cur) : cursorSkipTombstone(cur); + @Override public GridCursor cursor(Object x) throws IgniteCheckedException { + return dataTree.find(null, null, x); } /** {@inheritDoc} */ @Override public GridCursor cursor(MvccSnapshot mvccSnapshot) throws IgniteCheckedException { + GridCursor cursor; if (mvccSnapshot != null) { assert grp.mvccEnabled(); @@ -3090,20 +2856,20 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager new MvccFirstVisibleRowTreeClosure(grp.singleCacheContext(), mvccSnapshot), null); } else - cursor = cursorSkipTombstone(dataTree.find(null, null)); + cursor = dataTree.find(null, null); return cursor; } /** {@inheritDoc} */ - @Override public GridCursor cursor(int cacheId, boolean withTombstones) throws IgniteCheckedException { - return cursor(cacheId, null, null, null, null, withTombstones); + @Override public GridCursor cursor(int cacheId) throws IgniteCheckedException { + return cursor(cacheId, null, null); } /** {@inheritDoc} */ @Override public GridCursor cursor(int cacheId, MvccSnapshot mvccSnapshot) throws IgniteCheckedException { - return cursor(cacheId, null, null, null, mvccSnapshot, false); + return cursor(cacheId, null, null, null, mvccSnapshot); } /** {@inheritDoc} */ @@ -3114,18 +2880,13 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager /** {@inheritDoc} */ @Override public GridCursor cursor(int cacheId, KeyCacheObject lower, - KeyCacheObject upper, CacheDataRowAdapter.RowData x) throws IgniteCheckedException { - return cursor(cacheId, lower, upper, null, null, false); + KeyCacheObject upper, Object x) throws IgniteCheckedException { + return cursor(cacheId, lower, upper, null, null); } /** {@inheritDoc} */ - @Override public GridCursor cursor(int cacheId, - KeyCacheObject lower, - KeyCacheObject upper, - CacheDataRowAdapter.RowData x, - MvccSnapshot snapshot, - boolean withTombstones - ) throws IgniteCheckedException { + @Override public GridCursor cursor(int cacheId, KeyCacheObject lower, + KeyCacheObject upper, Object x, MvccSnapshot snapshot) throws IgniteCheckedException { SearchRow lowerRow; SearchRow upperRow; @@ -3149,13 +2910,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager cursor = dataTree.find(lowerRow, upperRow, new MvccFirstVisibleRowTreeClosure(cctx, snapshot), x); } - else { + else cursor = dataTree.find(lowerRow, upperRow, x); - if (!withTombstones) - cursor = cursorSkipTombstone(cursor); - } - return cursor; } @@ -3195,7 +2952,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager Exception ex = null; GridCursor cur = - cursor(cacheId, null, null, CacheDataRowAdapter.RowData.KEY_ONLY, null, true); + cursor(cacheId, null, null, CacheDataRowAdapter.RowData.KEY_ONLY); while (cur.next()) { CacheDataRow row = cur.get(); @@ -3239,24 +2996,17 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager * @param size Size to init. * @param updCntr Update counter. * @param cacheSizes Cache sizes if store belongs to group containing multiple caches. - * @param updCntrGapsData Update counters gaps raw data. - * @param tombstonesCnt Tombstones count. + * @param cntrUpdData Counter updates. */ - public void restoreState( - long size, - long updCntr, - Map cacheSizes, - byte[] updCntrGapsData, - long tombstonesCnt - ) { - pCntr.init(updCntr, updCntrGapsData); + public void restoreState(long size, long updCntr, @Nullable Map cacheSizes, byte[] cntrUpdData) { + pCntr.init(updCntr, cntrUpdData); storageSize.set(size); - for (Map.Entry e : cacheSizes.entrySet()) - this.cacheSizes.put(e.getKey(), new AtomicLong(e.getValue())); - - this.tombstonesCnt.set(tombstonesCnt); + if (cacheSizes != null) { + for (Map.Entry e : cacheSizes.entrySet()) + this.cacheSizes.put(e.getKey(), new AtomicLong(e.getValue())); + } } /** {@inheritDoc} */ @@ -3279,25 +3029,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager return null; } - /** {@inheritDoc} */ - @Override public long tombstonesCount() { - return tombstonesCnt.get(); - } - - /** - * Called when tombstone has removed from partition. - */ - private void tombstoneRemoved() { - tombstonesCnt.decrementAndGet(); - } - - /** - * Called when tombstone has created in partition. - */ - private void tombstoneCreated() { - tombstonesCnt.incrementAndGet(); - } - /** * @param cctx Cache context. * @param key Key. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IncompleteCacheObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IncompleteCacheObject.java index 3802c2b..dedb3bd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IncompleteCacheObject.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IncompleteCacheObject.java @@ -43,8 +43,6 @@ public class IncompleteCacheObject extends IncompleteObject { if (buf.remaining() >= HEAD_LEN) { data = new byte[buf.getInt()]; type = buf.get(); - - headerReady(); } // We cannot fully read head to initialize data buffer. // Start partial read of header. @@ -70,8 +68,6 @@ public class IncompleteCacheObject extends IncompleteObject { data = new byte[headBuf.getInt()]; type = headBuf.get(); - - headerReady(); } } @@ -80,21 +76,6 @@ public class IncompleteCacheObject extends IncompleteObject { } /** - * Invoke when object header is ready. - */ - private void headerReady() { - if (type == CacheObject.TOMBSTONE) - object(TombstoneCacheObject.INSTANCE); - } - - /** - * @return Size of already read data. - */ - public int dataOffset() { - return off; - } - - /** * @return Data type. */ public byte type() { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IncompleteObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IncompleteObject.java index 27c9def..7c24c12 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IncompleteObject.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IncompleteObject.java @@ -33,7 +33,7 @@ public class IncompleteObject { private T obj; /** */ - protected int off; + private int off; /** * @param data Data bytes. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java index 1d41d7f..112a110 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java @@ -35,9 +35,9 @@ public interface PartitionUpdateCounter extends Iterable { * Restores update counter state. * * @param initUpdCntr LWM. - * @param updCntrGapsData Updates counters gaps raw data. + * @param cntrUpdData Counter updates raw data. */ - public void init(long initUpdCntr, @Nullable byte[] updCntrGapsData); + public void init(long initUpdCntr, @Nullable byte[] cntrUpdData); /** * @deprecated TODO LWM should be used as initial counter https://ggsystems.atlassian.net/browse/GG-17396 diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/TombstoneCacheObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/TombstoneCacheObject.java deleted file mode 100644 index d87b024..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/TombstoneCacheObject.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import java.io.IOException; -import java.io.ObjectInput; -import org.apache.ignite.IgniteCheckedException; -import org.jetbrains.annotations.Nullable; - -/** - * Special value object indicating that value is removed. - */ -public class TombstoneCacheObject extends CacheObjectAdapter { - /** */ - private static final long serialVersionUID = 2106775575127797257L; - - /** Empty. */ - private static final byte[] EMPTY = new byte[] { }; - - /** Instance. */ - public static final TombstoneCacheObject INSTANCE = new TombstoneCacheObject(); - - /** - * Default constructor. - */ - public TombstoneCacheObject() { - valBytes = EMPTY; - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - valBytes = EMPTY; - } - - /** {@inheritDoc} */ - @Override public @Nullable T value(CacheObjectValueContext ctx, boolean cpy) { - return null; - } - - /** {@inheritDoc} */ - @Override public byte[] valueBytes(CacheObjectValueContext ctx) throws IgniteCheckedException { - return valBytes; - } - - /** {@inheritDoc} */ - @Override public byte cacheObjectType() { - return CacheObject.TOMBSTONE; - } - - /** {@inheritDoc} */ - @Override public boolean isPlatformType() { - return true; - } - - /** {@inheritDoc} */ - @Override public CacheObject prepareForCache(CacheObjectContext ctx) { - return this; - } - - /** {@inheritDoc} */ - @Override public void finishUnmarshal(CacheObjectValueContext ctx, ClassLoader ldr) throws IgniteCheckedException { - - } - - /** {@inheritDoc} */ - @Override public void prepareMarshal(CacheObjectValueContext ctx) throws IgniteCheckedException { - - } - - /** {@inheritDoc} */ - @Override public short directType() { - return 176; - } - - /** {@inheritDoc} */ - @Override public void onAckReceived() { - - } -} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java index 8c8a3ef..ac2d237 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache.binary; +import javax.cache.CacheException; import java.io.File; import java.io.Serializable; import java.math.BigDecimal; @@ -31,7 +32,6 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import javax.cache.CacheException; import org.apache.ignite.IgniteBinary; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteClientDisconnectedException; @@ -83,7 +83,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheUtils; import org.apache.ignite.internal.processors.cache.IncompleteCacheObject; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl; -import org.apache.ignite.internal.processors.cache.TombstoneCacheObject; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.processors.cacheobject.UserCacheObjectByteArrayImpl; @@ -1146,9 +1145,6 @@ public class CacheObjectBinaryProcessorImpl extends GridProcessorAdapter impleme case CacheObject.TYPE_REGULAR: return new CacheObjectImpl(null, bytes); - - case CacheObject.TOMBSTONE: - return TombstoneCacheObject.INSTANCE; } throw new IllegalArgumentException("Invalid object type: " + type); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java index 4f7986f..0eed54a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java @@ -43,7 +43,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMapImpl; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; -import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; @@ -54,7 +53,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; -import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.TxCounters; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -63,9 +61,7 @@ import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.collection.IntMap; import org.apache.ignite.internal.util.collection.IntRWHashMap; import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.lang.GridCursor; import org.apache.ignite.internal.util.lang.GridIterator; -import org.apache.ignite.internal.util.lang.GridIteratorAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; @@ -178,10 +174,10 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements * @param recovery Flag indicates that partition is created during recovery phase. */ public GridDhtLocalPartition( - GridCacheSharedContext ctx, - CacheGroupContext grp, - int id, - boolean recovery + GridCacheSharedContext ctx, + CacheGroupContext grp, + int id, + boolean recovery ) { super(ENTRY_FACTORY); @@ -606,12 +602,8 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements assert partState == MOVING || partState == LOST; - if (casState(state, OWNING)) { - if (hasTombstones()) - clearTombstonesAsync(); - + if (casState(state, OWNING)) return true; - } } } @@ -765,21 +757,6 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** - * @return {@code True} if partition has tombstone entries. - */ - boolean hasTombstones() { - return grp.supportsTombstone() && dataStore().tombstonesCount() > 0; - } - - /** - * Adds async task that will clear tombstone entries from partition. - * @see #clearTombstones(EvictionContext). - */ - void clearTombstonesAsync() { - grp.shared().evict().clearTombstonesAsync(grp, this); - } - - /** * Continues delayed clearing of partition if possible. * Clearing may be delayed because of existing reservations. */ @@ -944,7 +921,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements * @return {@code false} if clearing is not started due to existing reservations. * @throws NodeStoppingException If node is stopping. */ - public boolean tryClear(EvictionContext evictionCtx) throws NodeStoppingException, IgniteCheckedException { + public boolean tryClear(EvictionContext evictionCtx) throws NodeStoppingException { if (clearFuture.isDone()) return true; @@ -955,73 +932,8 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements if (addEvicting()) { try { - GridCacheVersion clearVer = ctx.versions().next(); - - GridCacheObsoleteEntryExtras extras = new GridCacheObsoleteEntryExtras(clearVer); - - boolean rec = grp.eventRecordable(EVT_CACHE_REBALANCE_OBJECT_UNLOADED); - - if (grp.sharedGroup()) - cacheMaps.forEach((key, hld) -> clearOnheapEntries(hld.map, extras, rec)); - else - clearOnheapEntries(singleCacheEntryMap.map, extras, rec); - // Attempt to evict partition entries from cache. - long clearedEntities = doClear( - evictionCtx, - 1000, - grp.offheap().partitionIterator(id, true), - (hld, row) -> { - // Do not clear fresh rows in case of partition reloading. - // This is required because normal updates are possible to moving partition which is currently cleared. - if (row.version().compareTo(clearVer) >= 0 && state() == MOVING) - return false; - - GridCacheMapEntry cached = putEntryIfObsoleteOrAbsent( - hld, - hld.cctx, - grp.affinity().lastVersion(), - row.key(), - true, - false); - - if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) { - removeEntry(cached); - - if (rec && !hld.cctx.config().isEventsDisabled()) { - hld.cctx.events().addEvent(cached.partition(), - cached.key(), - ctx.localNodeId(), - null, - null, - null, - EVT_CACHE_REBALANCE_OBJECT_UNLOADED, - null, - false, - cached.rawGet(), - cached.hasValue(), - null, - null, - null, - false); - } - - return true; - } - - return false; - } - ); - - if (forceTestCheckpointOnEviction) { - if (partWhereTestCheckpointEnforced == null && clearedEntities >= fullSize()) { - ctx.database().forceCheckpoint("test").finishFuture().get(); - - log.warning("Forced checkpoint by test reasons for partition: " + this); - - partWhereTestCheckpointEnforced = id; - } - } + long clearedEntities = clearAll(evictionCtx); if (log.isDebugEnabled()) log.debug("Partition has been cleared [grp=" + grp.cacheOrGroupName() @@ -1208,120 +1120,113 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** - * Iterates over partition entries and removes tombstone entries. + * Removes all entries and rows from this partition. * - * @param evictionCtx Eviction context. + * @return Number of rows cleared from page memory. + * @throws NodeStoppingException If node stopping. */ - void clearTombstones(EvictionContext evictionCtx) throws IgniteCheckedException { - if (evictionCtx.shouldStop()) - return; + private long clearAll(EvictionContext evictionCtx) throws NodeStoppingException { + GridCacheVersion clearVer = ctx.versions().next(); - GridIterator iter; + GridCacheObsoleteEntryExtras extras = new GridCacheObsoleteEntryExtras(clearVer); - try { - GridCursor cur = store.cursor(CacheDataRowAdapter.RowData.TOMBSTONES); + boolean rec = grp.eventRecordable(EVT_CACHE_REBALANCE_OBJECT_UNLOADED); - iter = new GridIteratorAdapter() { - @Override public boolean hasNextX() throws IgniteCheckedException { - return cur.next(); - } + if (grp.sharedGroup()) + cacheMaps.forEach((key, hld) -> clear(hld.map, extras, rec)); + else + clear(singleCacheEntryMap.map, extras, rec); - @Override public CacheDataRow nextX() throws IgniteCheckedException { - return cur.get(); - } + long cleared = 0; - @Override public void removeX() throws IgniteCheckedException { - throw new UnsupportedOperationException(); - } - }; - } - catch (IgniteCheckedException e) { - throw new IgniteCheckedException("Failed to get iterator for partition: " + id, e); - } + final int stopCheckingFreq = 1000; - doClear( - evictionCtx, - 10, - iter, - (hld, row) -> { - while (true) { - GridCacheMapEntry cached = null; + CacheMapHolder hld = grp.sharedGroup() ? null : singleCacheEntryMap; - try { - cached = putEntryIfObsoleteOrAbsent( - hld, - hld.cctx, - grp.affinity().lastVersion(), - row.key(), - true, - false); + try { + GridIterator it0 = grp.offheap().partitionIterator(id); - cached.removeTombstone(row.version()); + while (it0.hasNext()) { + ctx.database().checkpointReadLock(); - return true; - } - catch (GridCacheEntryRemovedException e) { - cached = null; - } - finally { - if (cached != null) - cached.touch(); - } - } - } - ); - } + try { + CacheDataRow row = it0.next(); - /** - * Runs abstract clear operation over partition data rows. - * - * @param evictionCtx Eviction context. - * @param stopCheckingFreq Frequency to check stopping eviction/clearing. - * @param rowIter Rows iterator. - * @param clearOp Clear operation. - * @return Number of cleared rows. - * @throws IgniteCheckedException If failed. - */ - private long doClear( - EvictionContext evictionCtx, - int stopCheckingFreq, - GridIterator rowIter, - ClearRowOperation clearOp - ) throws IgniteCheckedException { - long cleared = 0; + // Do not clear fresh rows in case of partition reloading. + // This is required because normal updates are possible to moving partition which is currently cleared. + if (row.version().compareTo(clearVer) >= 0 && state() == MOVING) + continue; - CacheMapHolder hld = grp.sharedGroup() ? null : singleCacheEntryMap; + if (grp.sharedGroup() && (hld == null || hld.cctx.cacheId() != row.cacheId())) + hld = cacheMapHolder(ctx.cacheContext(row.cacheId())); - while (rowIter.hasNext()) { - ctx.database().checkpointReadLock(); + assert hld != null; - try { - CacheDataRow row = rowIter.next(); + GridCacheMapEntry cached = putEntryIfObsoleteOrAbsent( + hld, + hld.cctx, + grp.affinity().lastVersion(), + row.key(), + true, + false); - assert row.key() != null : row; - assert row.version() != null : row; + if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) { + removeEntry(cached); - if (grp.sharedGroup() && (hld == null || hld.cctx.cacheId() != row.cacheId())) - hld = cacheMapHolder(ctx.cacheContext(row.cacheId())); + if (rec && !hld.cctx.config().isEventsDisabled()) { + hld.cctx.events().addEvent(cached.partition(), + cached.key(), + ctx.localNodeId(), + null, + null, + null, + EVT_CACHE_REBALANCE_OBJECT_UNLOADED, + null, + false, + cached.rawGet(), + cached.hasValue(), + null, + null, + null, + false); + } - assert hld != null; + cleared++; + } - if (clearOp.apply(hld, row)) - cleared++; + // For each 'stopCheckingFreq' cleared entities check clearing process to stop. + if (cleared % stopCheckingFreq == 0 && evictionCtx.shouldStop()) + return cleared; + } + catch (GridDhtInvalidPartitionException e) { + assert isEmpty() && state() == EVICTED : "Invalid error [e=" + e + ", part=" + this + ']'; - // For each 'stopCheckingFreq' cleared entities check clearing process to stop. - if (cleared % stopCheckingFreq == 0 && evictionCtx.shouldStop()) - return cleared; + break; // Partition is already concurrently cleared and evicted. + } + finally { + ctx.database().checkpointReadUnlock(); + } } - catch (GridDhtInvalidPartitionException e) { - assert isEmpty() && state() == EVICTED : "Invalid error [e=" + e + ", part=" + this + ']'; - break; // Partition is already concurrently cleared and evicted. - } - finally { - ctx.database().checkpointReadUnlock(); + if (forceTestCheckpointOnEviction) { + if (partWhereTestCheckpointEnforced == null && cleared >= fullSize()) { + ctx.database().forceCheckpoint("test").finishFuture().get(); + + log.warning("Forced checkpoint by test reasons for partition: " + this); + + partWhereTestCheckpointEnforced = id; + } } } + catch (NodeStoppingException e) { + if (log.isDebugEnabled()) + log.debug("Failed to get iterator for evicted partition: " + id); + + throw e; + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to get iterator for evicted partition: " + id, e); + } return cleared; } @@ -1334,11 +1239,9 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements * @param evt Unload event flag. * @throws NodeStoppingException If current node is stopping. */ - private void clearOnheapEntries( - ConcurrentMap map, + private void clear(ConcurrentMap map, GridCacheObsoleteEntryExtras extras, - boolean evt - ) throws NodeStoppingException { + boolean evt) throws NodeStoppingException { Iterator it = map.values().iterator(); while (it.hasNext()) { @@ -1639,18 +1542,6 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** - * Abstract operation to clear row. - */ - @FunctionalInterface - private static interface ClearRowOperation { - /** - * @param hld Hld. - * @param row Row. - */ - boolean apply(CacheMapHolder hld, CacheDataRow row) throws IgniteCheckedException; - } - - /** * Future is needed to control partition clearing process. * Future can be used both for single clearing or eviction processes. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java index 1f90108..2078eab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java @@ -716,10 +716,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { updateLocal(p, state, updateSeq, topVer); - if (state == RENTING) // Restart cleaning. + // Restart cleaning. + if (state == RENTING) locPart.clearAsync(); - else if (state == OWNING && locPart.hasTombstones()) - locPart.clearTombstonesAsync(); // Restart tombstones cleaning. } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java index 2d3b813..31399bd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java @@ -21,7 +21,6 @@ import java.util.Collection; import java.util.Comparator; import java.util.HashSet; import java.util.Map; -import java.util.Objects; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -108,74 +107,35 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter { } /** - * @param grp Group context. - * @param part Partition to clear tombstones. - */ - public void clearTombstonesAsync(CacheGroupContext grp, GridDhtLocalPartition part) { - if (addAsyncTask(grp, part, TaskType.CLEAR_TOMBSTONES)) { - if (log.isDebugEnabled()) - log.debug("Partition has been scheduled for tomstones cleanup [grp=" + grp.cacheOrGroupName() - + ", p=" + part.id() + ", state=" + part.state() + "]"); - } - } - - /** * Adds partition to eviction queue and starts eviction process if permit available. * * @param grp Group context. * @param part Partition to evict. */ public void evictPartitionAsync(CacheGroupContext grp, GridDhtLocalPartition part) { - if (addAsyncTask(grp, part, TaskType.EVICT)) { - if (log.isDebugEnabled()) - log.debug("Partition has been scheduled for eviction [grp=" + grp.cacheOrGroupName() - + ", p=" + part.id() + ", state=" + part.state() + "]"); - } - } - - /** - * @param grp Group context. - * @param part Partition. - * @param type Task type. - * @return {@code True} if task was added. - */ - private boolean addAsyncTask(CacheGroupContext grp, GridDhtLocalPartition part, TaskType type) { GroupEvictionContext grpEvictionCtx = evictionGroupsMap.computeIfAbsent( grp.groupId(), (k) -> new GroupEvictionContext(grp)); // Check node stop. if (grpEvictionCtx.shouldStop()) - return false; + return; int bucket; - AbstractEvictionTask task; - - switch (type) { - case EVICT: - task = new PartitionEvictionTask(part, grpEvictionCtx); - break; - - case CLEAR_TOMBSTONES: - task = new ClearTombstonesTask(part, grpEvictionCtx); - break; - - default: - throw new UnsupportedOperationException("Unsupported task type: " + type); - } - synchronized (mux) { - if (!grpEvictionCtx.taskIds.add(task.id)) - return false; + if (!grpEvictionCtx.partIds.add(part.id())) + return; - bucket = evictionQueue.offer(task); + bucket = evictionQueue.offer(new PartitionEvictionTask(part, grpEvictionCtx)); } - grpEvictionCtx.taskAdded(task); + grpEvictionCtx.totalTasks.incrementAndGet(); - scheduleNextTask(bucket); + if (log.isDebugEnabled()) + log.debug("Partition has been scheduled for eviction [grp=" + grp.cacheOrGroupName() + + ", p=" + part.id() + ", state=" + part.state() + "]"); - return true; + scheduleNextPartitionEviction(bucket); } /** @@ -183,7 +143,7 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter { * * @param bucket Bucket. */ - private void scheduleNextTask(int bucket) { + private void scheduleNextPartitionEviction(int bucket) { // Check node stop. if (sharedEvictionCtx.shouldStop()) return; @@ -198,7 +158,7 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter { // Get task until we have permits. while (permits >= 0) { // Get task from bucket. - AbstractEvictionTask evictionTask = evictionQueue.poll(bucket); + PartitionEvictionTask evictionTask = evictionQueue.poll(bucket); // If bucket empty try get from another. if (evictionTask == null) { @@ -238,12 +198,12 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter { permits++; } - // Re-schedule new one task for same bucket. - scheduleNextTask(bucket); + // Re-schedule new one task form same bucket. + scheduleNextPartitionEviction(bucket); }); // Submit task to executor. - cctx.kernalContext() + cctx.kernalContext() .closure() .runLocalSafe(evictionTask, EVICT_POOL_PLC); } @@ -259,10 +219,10 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter { int size = evictionQueue.size() + 1; // Queue size plus current partition. if (log.isInfoEnabled()) - log.info("Partition cleanup in progress [permits=" + permits+ + log.info("Eviction in progress [permits=" + permits+ ", threads=" + threads + ", groups=" + evictionGroupsMap.keySet().size() + - ", remainingTasks=" + size + "]"); + ", remainingPartsToEvict=" + size + "]"); evictionGroupsMap.values().forEach(GroupEvictionContext::showProgress); @@ -308,66 +268,30 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter { /** * */ - private static class TasksStatistics { - /** */ - private int total; - - /** */ - private int inProgress; - - /** - * - */ - void taskAdded() { - total++; - } - - /** - * - */ - void taskStarted() { - inProgress++; - } - - /** - * - */ - void taskFinished() { - total--; - inProgress--; - } - } - - /** - * - */ private class GroupEvictionContext implements EvictionContext { /** */ private final CacheGroupContext grp; - /** Deduplicate set partition tasks. */ - private final Set taskIds = new HashSet<>(); + /** Deduplicate set partition ids. */ + private final Set partIds = new HashSet<>(); /** Future for currently running partition eviction task. */ - private final Map> taskFutures = new ConcurrentHashMap<>(); + private final Map> partsEvictFutures = new ConcurrentHashMap<>(); /** Flag indicates that eviction process has stopped for this group. */ private volatile boolean stop; - /** Total tasks. */ + /** Total partition to evict. */ private AtomicInteger totalTasks = new AtomicInteger(); - /** */ - private Map stats = U.newHashMap(TaskType.VALS.length); + /** Total partition evict in progress. */ + private int taskInProgress; /** * @param grp Group context. */ private GroupEvictionContext(CacheGroupContext grp) { this.grp = grp; - - for (TaskType type : TaskType.VALS) - stats.put(type, new TasksStatistics()); } /** {@inheritDoc} */ @@ -376,37 +300,28 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter { } /** - * @param task Task. - */ - void taskAdded(AbstractEvictionTask task) { - totalTasks.incrementAndGet(); - - synchronized (this) { - stats.get(task.id.type).taskAdded(); - } - } - - /** * * @param task Partition eviction task. */ - private synchronized void taskScheduled(AbstractEvictionTask task) { + private synchronized void taskScheduled(PartitionEvictionTask task) { if (shouldStop()) return; - stats.get(task.id.type).taskStarted(); + taskInProgress++; GridFutureAdapter fut = task.finishFut; - taskIds.remove(task.id); + int partId = task.part.id(); - taskFutures.put(task.id, fut); + partIds.remove(partId); + + partsEvictFutures.put(partId, fut); fut.listen(f -> { synchronized (this) { - stats.get(task.id.type).taskFinished(); + taskInProgress--; - taskFutures.remove(task.id, f); + partsEvictFutures.remove(partId, f); if (totalTasks.decrementAndGet() == 0) evictionGroupsMap.remove(grp.groupId()); @@ -425,7 +340,7 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter { * Await evict finish. */ private void awaitFinishAll(){ - taskFutures.forEach(this::awaitFinish); + partsEvictFutures.forEach(this::awaitFinish); evictionGroupsMap.remove(grp.groupId()); } @@ -433,17 +348,17 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter { /** * Await evict finish partition. */ - private void awaitFinish(TaskId taskId, IgniteInternalFuture fut) { + private void awaitFinish(Integer part, IgniteInternalFuture fut) { // Wait for last offered partition eviction completion try { - log.info("Await partition cleanup [grpName=" + grp.cacheOrGroupName() + - ", grpId=" + grp.groupId() + ", task=" + taskId.type + ", partId=" + taskId.part + ']'); + log.info("Await partition evict, grpName=" + grp.cacheOrGroupName() + + ", grpId=" + grp.groupId() + ", partId=" + part); fut.get(); } catch (IgniteCheckedException e) { if (log.isDebugEnabled()) - log.warning("Failed to await partition cleanup during stopping.", e); + log.warning("Failed to await partition eviction during stopping.", e); } } @@ -451,132 +366,47 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter { * Shows progress group of eviction. */ private void showProgress() { - if (log.isInfoEnabled()) { - StringBuilder msg = new StringBuilder( - "Group cleanup in progress [grpName=" + grp.cacheOrGroupName() + ", grpId=" + grp.groupId()); - - synchronized (this) { - TasksStatistics evicts = stats.get(TaskType.EVICT); - if (evicts.total > 0) { - msg.append(", remainingPartsToEvict=" + (evicts.total - evicts.inProgress)). - append(", partsEvictInProgress=" + evicts.inProgress); - } - - TasksStatistics tombstones = stats.get(TaskType.CLEAR_TOMBSTONES); - if (tombstones.total > 0) { - msg.append(", remainingPartsToClearTombstones=" + (tombstones.total - tombstones.inProgress)). - append(", tombstoneClearInProgress=" + tombstones.inProgress); - } - } - - msg.append(", totalParts=" + grp.topology().localPartitions().size() + "]"); - - log.info(msg.toString()); - } - } - } - - /** - * - */ - private enum TaskType { - /** */ - EVICT, - - /** */ - CLEAR_TOMBSTONES; - - /** */ - private static TaskType[] VALS = values(); - } - - /** - * - */ - private static class TaskId { - /** */ - final int part; - - /** */ - final TaskType type; - - /** - * @param part Partiotion id. - * @param type Task type. - */ - TaskId(int part, TaskType type) { - this.part = part; - this.type = type; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - TaskId taskKey = (TaskId)o; - - return part == taskKey.part && type == taskKey.type; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return Objects.hash(part, type); + if (log.isInfoEnabled()) + log.info("Group eviction in progress [grpName=" + grp.cacheOrGroupName()+ + ", grpId=" + grp.groupId() + + ", remainingPartsToEvict=" + (totalTasks.get() - taskInProgress) + + ", partsEvictInProgress=" + taskInProgress + + ", totalParts=" + grp.topology().localPartitions().size() + "]"); } } /** - * + * Task for self-scheduled partition eviction / clearing. */ - abstract class AbstractEvictionTask implements Runnable { + class PartitionEvictionTask implements Runnable { /** Partition to evict. */ - protected final GridDhtLocalPartition part; + private final GridDhtLocalPartition part; /** */ - protected final long size; + private final long size; /** Eviction context. */ - protected final GroupEvictionContext grpEvictionCtx; + private final GroupEvictionContext grpEvictionCtx; /** */ - protected final GridFutureAdapter finishFut = new GridFutureAdapter<>(); - - /** */ - private final TaskId id; + private final GridFutureAdapter finishFut = new GridFutureAdapter<>(); /** * @param part Partition. * @param grpEvictionCtx Eviction context. */ - private AbstractEvictionTask( - GridDhtLocalPartition part, - GroupEvictionContext grpEvictionCtx, - TaskType type + private PartitionEvictionTask( + GridDhtLocalPartition part, + GroupEvictionContext grpEvictionCtx ) { this.part = part; this.grpEvictionCtx = grpEvictionCtx; - id = new TaskId(part.id(), type); - size = part.fullSize(); } - /** - * @return {@code False} if need retry task later. - * @throws IgniteCheckedException If failed. - */ - abstract boolean run0() throws IgniteCheckedException; - - /** - * - */ - abstract void scheduleRetry(); - /** {@inheritDoc} */ - @Override public final void run() { + @Override public void run() { if (grpEvictionCtx.shouldStop()) { finishFut.onDone(); @@ -584,7 +414,12 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter { } try { - boolean success = run0(); + boolean success = part.tryClear(grpEvictionCtx); + + if (success) { + if (part.state() == GridDhtPartitionState.EVICTED && part.markForDestroy()) + part.destroy(); + } // Complete eviction future before schedule new to prevent deadlock with // simultaneous eviction stopping and scheduling new eviction. @@ -592,7 +427,7 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter { // Re-offer partition if clear was unsuccessful due to partition reservation. if (!success) - scheduleRetry(); + evictPartitionAsync(grpEvictionCtx.grp, part); } catch (Throwable ex) { finishFut.onDone(ex); @@ -603,7 +438,7 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter { true); } else { - LT.error(log, ex, "Partition eviction failed."); + LT.error(log, ex, "Partition eviction failed, this can cause grid hang."); cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, ex)); } @@ -612,80 +447,15 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter { } /** - * Task for self-scheduled partition eviction / clearing. - */ - class PartitionEvictionTask extends AbstractEvictionTask { - /** - * @param part Partition. - * @param grpEvictionCtx Eviction context. - */ - private PartitionEvictionTask( - GridDhtLocalPartition part, - GroupEvictionContext grpEvictionCtx - ) { - super(part, grpEvictionCtx, TaskType.EVICT); - } - - /** {@inheritDoc} */ - @Override void scheduleRetry() { - evictPartitionAsync(grpEvictionCtx.grp, part); - } - - /** {@inheritDoc} */ - @Override public boolean run0() throws IgniteCheckedException { - assert part.state() != GridDhtPartitionState.OWNING : part; - - boolean success = part.tryClear(grpEvictionCtx); - - assert part.state() != GridDhtPartitionState.OWNING : part; - - if (success) { - if (part.state() == GridDhtPartitionState.EVICTED && part.markForDestroy()) - part.destroy(); - } - - return success; - } - } - - /** - * - */ - class ClearTombstonesTask extends AbstractEvictionTask { - /** - * @param part Partition. - * @param grpEvictionCtx Eviction context. - */ - private ClearTombstonesTask( - GridDhtLocalPartition part, - GroupEvictionContext grpEvictionCtx - ) { - super(part, grpEvictionCtx, TaskType.CLEAR_TOMBSTONES); - } - - /** {@inheritDoc} */ - @Override void scheduleRetry() { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public boolean run0() throws IgniteCheckedException { - part.clearTombstones(grpEvictionCtx); - - return true; - } - } - - /** * */ class BucketQueue { - /** Queues contains partitions scheduled for eviction. */ - final Queue[] buckets; - /** */ private final long[] bucketSizes; + /** Queues contains partitions scheduled for eviction. */ + final Queue[] buckets; + /** * @param buckets Number of buckets. */ @@ -704,8 +474,8 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter { * @param bucket Bucket index. * @return Partition evict task, or {@code null} if bucket queue is empty. */ - AbstractEvictionTask poll(int bucket) { - AbstractEvictionTask task = buckets[bucket].poll(); + PartitionEvictionTask poll(int bucket) { + PartitionEvictionTask task = buckets[bucket].poll(); if (task != null) bucketSizes[bucket] -= task.size; @@ -718,7 +488,7 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter { * * @return Partition evict task. */ - AbstractEvictionTask pollAny() { + PartitionEvictionTask pollAny() { for (int bucket = 0; bucket < bucketSizes.length; bucket++){ if (!buckets[bucket].isEmpty()) return poll(bucket); @@ -733,7 +503,7 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter { * @param task Eviction task. * @return Bucket index. */ - int offer(AbstractEvictionTask task) { + int offer(PartitionEvictionTask task) { int bucket = calculateBucket(); buckets[bucket].offer(task); @@ -757,7 +527,7 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter { int size(){ int size = 0; - for (Queue queue : buckets) + for (Queue queue : buckets) size += queue.size(); return size; @@ -787,7 +557,7 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter { * * @return Queue for evict partitions. */ - private Queue createEvictPartitionQueue() { + private Queue createEvictPartitionQueue() { switch (QUEUE_TYPE) { case 1: return new PriorityBlockingQueue<>( diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java index 0b7c4ac..746b94a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java @@ -59,11 +59,6 @@ public interface CacheDataRow extends MvccUpdateVersionAware, CacheSearchRow, St */ public void key(KeyCacheObject key); - /** - * @param cacheId Cache ID. - */ - public void cacheId(int cacheId); - /** {@inheritDoc} */ @Override public default IOVersions ioVersions() { return DataPageIO.VERSIONS; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java index a3b876d..06e7214 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java @@ -52,7 +52,6 @@ import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_CR import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_OP_COUNTER_NA; import static org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter.RowData.KEY_ONLY; import static org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter.RowData.LINK_WITH_HEADER; -import static org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter.RowData.TOMBSTONES; /** * Cache data row adapter. @@ -352,7 +351,9 @@ public class CacheDataRowAdapter implements CacheDataRow { buf.position(off); buf.limit(off + payloadSize); - incomplete = readFragment(sharedCtx, coctx, buf, rowData, readCacheId, incomplete, skipVer); + boolean keyOnly = rowData == RowData.KEY_ONLY; + + incomplete = readFragment(sharedCtx, coctx, buf, keyOnly, readCacheId, incomplete, skipVer); if (incomplete != null) incomplete.setNextLink(nextLink); @@ -378,7 +379,7 @@ public class CacheDataRowAdapter implements CacheDataRow { * @param sharedCtx Cache shared context. * @param coctx Cache object context. * @param buf Buffer. - * @param rowData Required row data. + * @param keyOnly {@code true} If need to read only key object. * @param readCacheId {@code true} If need to read cache ID. * @param incomplete Incomplete object. * @param skipVer Whether version read should be skipped. @@ -389,13 +390,11 @@ public class CacheDataRowAdapter implements CacheDataRow { GridCacheSharedContext sharedCtx, CacheObjectContext coctx, ByteBuffer buf, - RowData rowData, + boolean keyOnly, boolean readCacheId, IncompleteObject incomplete, boolean skipVer ) throws IgniteCheckedException { - boolean tombstones = rowData == TOMBSTONES; - if (readCacheId && cacheId == 0) { incomplete = readIncompleteCacheId(buf, incomplete); @@ -417,12 +416,6 @@ public class CacheDataRowAdapter implements CacheDataRow { // Read key. if (key == null) { - if (tombstones && sharedCtx.database().isTombstone(buf, key, (IncompleteCacheObject)incomplete) == Boolean.FALSE) { - verReady = true; - - return null; - } - incomplete = readIncompleteKey(coctx, buf, (IncompleteCacheObject)incomplete); if (key == null) { @@ -430,7 +423,7 @@ public class CacheDataRowAdapter implements CacheDataRow { return incomplete; // Need to finish reading the key. } - if (rowData == RowData.KEY_ONLY) + if (keyOnly) return null; // Key is ready - we are done! incomplete = null; @@ -449,13 +442,6 @@ public class CacheDataRowAdapter implements CacheDataRow { // Read value. if (val == null) { - if (tombstones && sharedCtx.database().isTombstone(buf, key, (IncompleteCacheObject)incomplete) == Boolean.FALSE) { - key = null; - verReady = true; - - return null; - } - incomplete = readIncompleteValue(coctx, buf, (IncompleteCacheObject)incomplete); if (val == null) { @@ -466,14 +452,6 @@ public class CacheDataRowAdapter implements CacheDataRow { incomplete = null; } - if (tombstones && !sharedCtx.database().isTombstone(this)) { - key = null; - val = null; - verReady = true; - - return null; - } - // Read version. if (!verReady) { incomplete = readIncompleteVersion(buf, incomplete, skipVer); @@ -520,14 +498,6 @@ public class CacheDataRowAdapter implements CacheDataRow { int len = PageUtils.getInt(addr, off); off += 4; - boolean tombstones = rowData == RowData.TOMBSTONES; - - if (tombstones && !sharedCtx.database().isTombstone(addr + off + len + 1)) { - verReady = true; // Mark as ready, no need to read any data. - - return; - } - if (rowData != RowData.NO_KEY && rowData != RowData.NO_KEY_WITH_HINTS) { byte type = PageUtils.getByte(addr, off); off++; @@ -549,14 +519,11 @@ public class CacheDataRowAdapter implements CacheDataRow { byte type = PageUtils.getByte(addr, off); off++; - if (!tombstones) { - byte[] bytes = PageUtils.getBytes(addr, off, len); - - val = coctx.kernalContext().cacheObjects().toCacheObject(coctx, type, bytes); - } - + byte[] bytes = PageUtils.getBytes(addr, off, len); off += len; + val = coctx.kernalContext().cacheObjects().toCacheObject(coctx, type, bytes); + int verLen; if (skipVer) { @@ -854,11 +821,6 @@ public class CacheDataRowAdapter implements CacheDataRow { } /** {@inheritDoc} */ - @Override public void cacheId(int cacheId) { - this.cacheId = cacheId; - } - - /** {@inheritDoc} */ @Override public CacheObject value() { assert val != null : "Value is not ready: " + this; @@ -974,10 +936,7 @@ public class CacheDataRowAdapter implements CacheDataRow { FULL_WITH_HINTS, /** Force instant hints actualization for update operation with history (to avoid races with vacuum). */ - NO_KEY_WITH_HINTS, - - /** Do not read row data for non-tombstone entries. */ - TOMBSTONES + NO_KEY_WITH_HINTS } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java index df19bd5..b321321 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java @@ -52,7 +52,7 @@ import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot; import org.apache.ignite.internal.pagemem.wal.record.RollbackRecord; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageInitRecord; -import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdatePartitionDataRecordV3; +import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdatePartitionDataRecordV2; import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionDestroyRecord; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheDiagnosticManager; @@ -297,10 +297,14 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple if (rowStore0 != null) { ((CacheFreeList)rowStore0.freeList()).saveMetadata(grp.statisticsHolderData()); + PartitionMetaStorage partStore = store.partStorage(); + long updCntr = store.updateCounter(); long size = store.fullSize(); long rmvId = globalRemoveId().get(); + byte[] updCntrsBytes = store.partUpdateCounter().getBytes(); + PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory(); IgniteWriteAheadLogManager wal = this.ctx.wal(); @@ -325,9 +329,6 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple return; } - assert state != null || grp.isLocal() : "Partition state is undefined " + - "[grp=" + grp.cacheOrGroupName() + ", part=" + part + "]"; - int grpId = grp.groupId(); long partMetaId = pageMem.partitionMetaPageId(grpId, store.partId()); @@ -348,196 +349,141 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple try { PagePartitionMetaIOV2 io = PageIO.getPageIO(partMetaPageAddr); - changed |= io.setPartitionState(partMetaPageAddr, state != null ? (byte)state.ordinal() : -1); - changed |= io.setUpdateCounter(partMetaPageAddr, updCntr); - changed |= io.setGlobalRemoveId(partMetaPageAddr, rmvId); - changed |= io.setSize(partMetaPageAddr, size); - changed |= io.setTombstonesCount(partMetaPageAddr, store.tombstonesCount()); - changed |= savePartitionUpdateCounterGaps(store, io, partMetaPageAddr); - changed |= saveCacheSizes(store, io, partMetaPageAddr); - - if (needSnapshot) - changed |= savePagesCount(ctx, part, store, io, partMetaPageAddr); - - if (changed && PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, partMetaId, partMetaPage, wal, null)) - wal.log(new MetaPageUpdatePartitionDataRecordV3( - grpId, - partMetaId, - updCntr, - rmvId, - (int)size, // TODO: Partition size may be long - io.getCacheSizesPageId(partMetaPageAddr), - io.getPartitionState(partMetaPageAddr), - io.getCandidatePageCount(partMetaPageAddr), - io.getGapsLink(partMetaPageAddr), - io.getTombstonesCount(partMetaPageAddr) - )); - } - finally { - pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, changed); - } - } - finally { - pageMem.releasePage(grpId, partMetaId, partMetaPage); - } - } - else if (needSnapshot) - tryAddEmptyPartitionToSnapshot(store, ctx); - } - else if (needSnapshot) - tryAddEmptyPartitionToSnapshot(store, ctx); - } + long link = io.getGapsLink(partMetaPageAddr); - /** - * Saves to partition meta page information about partition update counter gaps. - * - * @param store Partition data store. - * @param io I/O for partition meta page. - * @param partMetaPageAddr Partition meta page address. - * @return {@code True} if partition meta data is changed. - * @throws IgniteCheckedException If failed. - */ - private boolean savePartitionUpdateCounterGaps( - CacheDataStore store, - PagePartitionMetaIOV2 io, - long partMetaPageAddr - ) throws IgniteCheckedException { - PartitionMetaStorage partStore = store.partStorage(); + if (updCntrsBytes == null && link != 0) { + partStore.removeDataRowByLink(link, grp.statisticsHolderData()); - byte[] updCntrsBytes = store.partUpdateCounter().getBytes(); + io.setGapsLink(partMetaPageAddr, (link = 0)); - long gapsLink = io.getGapsLink(partMetaPageAddr); + changed = true; + } + else if (updCntrsBytes != null && link == 0) { + SimpleDataRow row = new SimpleDataRow(store.partId(), updCntrsBytes); - boolean changed = false; + partStore.insertDataRow(row, grp.statisticsHolderData()); - if (updCntrsBytes == null && gapsLink != 0) { - partStore.removeDataRowByLink(gapsLink, grp.statisticsHolderData()); + io.setGapsLink(partMetaPageAddr, (link = row.link())); - io.setGapsLink(partMetaPageAddr, 0); + changed = true; + } + else if (updCntrsBytes != null && link != 0) { + byte[] prev = partStore.readRow(link); - changed = true; - } - else if (updCntrsBytes != null && gapsLink == 0) { - SimpleDataRow row = new SimpleDataRow(store.partId(), updCntrsBytes); + assert prev != null : "Read null gaps using link=" + link; - partStore.insertDataRow(row, grp.statisticsHolderData()); + if (!Arrays.equals(prev, updCntrsBytes)) { + partStore.removeDataRowByLink(link, grp.statisticsHolderData()); - io.setGapsLink(partMetaPageAddr, row.link()); + SimpleDataRow row = new SimpleDataRow(store.partId(), updCntrsBytes); - changed = true; - } - else if (updCntrsBytes != null && gapsLink != 0) { - byte[] prev = partStore.readRow(gapsLink); + partStore.insertDataRow(row, grp.statisticsHolderData()); - assert prev != null : "Read null gaps using link=" + gapsLink; + io.setGapsLink(partMetaPageAddr, (link = row.link())); - if (!Arrays.equals(prev, updCntrsBytes)) { - partStore.removeDataRowByLink(gapsLink, grp.statisticsHolderData()); + changed = true; + } + } - SimpleDataRow row = new SimpleDataRow(store.partId(), updCntrsBytes); + if (changed) + partStore.saveMetadata(grp.statisticsHolderData()); - partStore.insertDataRow(row, grp.statisticsHolderData()); + changed |= io.setUpdateCounter(partMetaPageAddr, updCntr); + changed |= io.setGlobalRemoveId(partMetaPageAddr, rmvId); + changed |= io.setSize(partMetaPageAddr, size); - io.setGapsLink(partMetaPageAddr, row.link()); + if (state != null) + changed |= io.setPartitionState(partMetaPageAddr, (byte)state.ordinal()); + else + assert grp.isLocal() : grp.cacheOrGroupName(); - changed = true; - } - } + long cntrsPageId; - if (changed) - partStore.saveMetadata(grp.statisticsHolderData()); + if (grp.sharedGroup()) { + long initCntrPageId = io.getCountersPageId(partMetaPageAddr); - return changed; - } + Map newSizes = store.cacheSizes(); + Map prevSizes = readSharedGroupCacheSizes(pageMem, grpId, initCntrPageId); - /** - * Saves to partition meta page information about logical cache sizes inside cache group. - * - * @param store Partition data store. - * @param io I/O for partition meta page. - * @param partMetaPageAddr Partition meta page address. - * @return {@code True} if partition meta data is changed. - * @throws IgniteCheckedException If failed. - */ - private boolean saveCacheSizes( - CacheDataStore store, - PagePartitionMetaIOV2 io, - long partMetaPageAddr - ) throws IgniteCheckedException { - if (grp.sharedGroup()) { - PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory(); - - long oldCacheSizesPageId = io.getCacheSizesPageId(partMetaPageAddr); + if (prevSizes != null && prevSizes.equals(newSizes)) + cntrsPageId = initCntrPageId; // Preventing modification of sizes pages for store + else { + cntrsPageId = writeSharedGroupCacheSizes(pageMem, grpId, initCntrPageId, + store.partId(), newSizes); - Map newSizes = store.cacheSizes(); - Map prevSizes = readSharedGroupCacheSizes(pageMem, grp.groupId(), oldCacheSizesPageId); + if (initCntrPageId == 0 && cntrsPageId != 0) { + io.setCountersPageId(partMetaPageAddr, cntrsPageId); - if (prevSizes == null || !prevSizes.equals(newSizes)) { - long cacheSizesPageId = writeSharedGroupCacheSizes(pageMem, grp.groupId(), oldCacheSizesPageId, - store.partId(), newSizes); + changed = true; + } + } + } + else + cntrsPageId = 0L; + + int pageCnt; + + if (needSnapshot) { + pageCnt = this.ctx.pageStore().pages(grpId, store.partId()); + + io.setCandidatePageCount(partMetaPageAddr, size == 0 ? 0 : pageCnt); + + if (state == OWNING) { + assert part != null; + + if (!addPartition( + part, + ctx.partitionStatMap(), + partMetaPageAddr, + io, + grpId, + store.partId(), + this.ctx.pageStore().pages(grpId, store.partId()), + store.fullSize() + )) + U.warn(log, "Partition was concurrently evicted grpId=" + grpId + + ", partitionId=" + part.id()); + } + else if (state == MOVING || state == RENTING) { + if (ctx.partitionStatMap().forceSkipIndexPartition(grpId)) { + if (log.isInfoEnabled()) + log.info("Will not include SQL indexes to snapshot because there is " + + "a partition not in " + OWNING + " state [grp=" + grp.cacheOrGroupName() + + ", partId=" + store.partId() + ", state=" + state + ']'); + } + } - if (oldCacheSizesPageId == 0 && cacheSizesPageId != 0) { - io.setSizesPageId(partMetaPageAddr, cacheSizesPageId); + changed = true; + } + else + pageCnt = io.getCandidatePageCount(partMetaPageAddr); - return true; + if (changed && PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, partMetaId, partMetaPage, wal, null)) + wal.log(new MetaPageUpdatePartitionDataRecordV2( + grpId, + partMetaId, + updCntr, + rmvId, + (int)size, // TODO: Partition size may be long + cntrsPageId, + state == null ? -1 : (byte)state.ordinal(), + pageCnt, + link + )); + } + finally { + pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, changed); + } + } + finally { + pageMem.releasePage(grpId, partMetaId, partMetaPage); } } + else if (needSnapshot) + tryAddEmptyPartitionToSnapshot(store, ctx); } - else - io.setSizesPageId(partMetaPageAddr, 0); - - return false; - } - - /** - * Saves to partition meta page information about pages count. - * - * @param ctx Checkpoint context. - * @param part Partition. - * @param store Partition data store. - * @param io I/O for partition meta page. - * @param partMetaPageAddr Partition meta page address. - * @return {@code True} if partition meta data is changed. - * @throws IgniteCheckedException If failed. - */ - private boolean savePagesCount( - Context ctx, - GridDhtLocalPartition part, - CacheDataStore store, - PagePartitionMetaIOV2 io, - long partMetaPageAddr - ) throws IgniteCheckedException { - int grpId = grp.groupId(); - int pageCnt = this.ctx.pageStore().pages(grpId, store.partId()); - - io.setCandidatePageCount(partMetaPageAddr, io.getSize(partMetaPageAddr) == 0 ? 0 : pageCnt); - - if (part.state() == OWNING) { - assert part != null; - - if (!addPartition( - part, - ctx.partitionStatMap(), - partMetaPageAddr, - io, - grpId, - store.partId(), - this.ctx.pageStore().pages(grp.groupId(), store.partId()), - store.fullSize() - )) - U.warn(log, "Partition was concurrently evicted grpId=" + grpId + - ", partitionId=" + part.id()); - } - else if (part.state() == MOVING || part.state() == RENTING) { - if (ctx.partitionStatMap().forceSkipIndexPartition(grpId)) { - if (log.isInfoEnabled()) - log.info("Will not include SQL indexes to snapshot because there is " + - "a partition not in " + OWNING + " state [grp=" + grp.cacheOrGroupName() + - ", partId=" + store.partId() + ", state=" + part.state() + ']'); - } - } - - return true; + else if (needSnapshot) + tryAddEmptyPartitionToSnapshot(store, ctx); } /** {@inheritDoc} */ @@ -701,13 +647,11 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple * return null if counter page does not exist. * @throws IgniteCheckedException If page memory operation failed. */ - private static Map readSharedGroupCacheSizes( - PageSupport pageMem, - int grpId, - long cntrsPageId - ) throws IgniteCheckedException { + @Nullable private static Map readSharedGroupCacheSizes(PageSupport pageMem, int grpId, + long cntrsPageId) throws IgniteCheckedException { + if (cntrsPageId == 0L) - return Collections.emptyMap(); + return null; Map cacheSizes = new HashMap<>(); @@ -740,7 +684,6 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple pageMem.releasePage(grpId, curId, curPage); } } - return cacheSizes; } @@ -1547,11 +1490,6 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } /** {@inheritDoc} */ - @Override public void cacheId(int cacheId) { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ @Override public long mvccCoordinatorVersion() { return 0; // TODO IGNITE-7384 } @@ -1884,19 +1822,16 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple if (PageIO.getType(pageAddr) != 0) { PagePartitionMetaIOV2 io = (PagePartitionMetaIOV2)PagePartitionMetaIO.VERSIONS.latest(); - long gapsLink = io.getGapsLink(pageAddr); + Map cacheSizes = null; - byte[] updCntrGapsData = gapsLink == 0 ? null : partStorage.readRow(gapsLink); + if (grp.sharedGroup()) + cacheSizes = readSharedGroupCacheSizes(pageMem, grpId, io.getCountersPageId(pageAddr)); - delegate0.restoreState( - io.getSize(pageAddr), - io.getUpdateCounter(pageAddr), - grp.sharedGroup() - ? readSharedGroupCacheSizes(pageMem, grpId, io.getCacheSizesPageId(pageAddr)) - : Collections.emptyMap(), - updCntrGapsData, - io.getTombstonesCount(pageAddr) - ); + long link = io.getGapsLink(pageAddr); + + byte[] data = link == 0 ? null : partStorage.readRow(link); + + delegate0.restoreState(io.getSize(pageAddr), io.getUpdateCounter(pageAddr), cacheSizes, data); globalRemoveId().setIfGreater(io.getGlobalRemoveId(pageAddr)); } @@ -2496,20 +2431,6 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } /** {@inheritDoc} */ - @Override public void removeWithTombstone( - GridCacheContext cctx, - KeyCacheObject key, - GridCacheVersion ver, - GridDhtLocalPartition part - ) throws IgniteCheckedException { - assert ctx.database().checkpointLockIsHeldByThread(); - - CacheDataStore delegate = init0(false); - - delegate.removeWithTombstone(cctx, key, ver, part); - } - - /** {@inheritDoc} */ @Override public CacheDataRow find(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException { CacheDataStore delegate = init0(true); @@ -2543,7 +2464,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple /** {@inheritDoc} */ @Override public GridCursor mvccAllVersionsCursor(GridCacheContext cctx, - KeyCacheObject key, CacheDataRowAdapter.RowData x) throws IgniteCheckedException { + KeyCacheObject key, Object x) throws IgniteCheckedException { CacheDataStore delegate = init0(true); if (delegate != null) @@ -2554,17 +2475,17 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple /** {@inheritDoc} */ - @Override public GridCursor cursor(boolean withTombstones) throws IgniteCheckedException { + @Override public GridCursor cursor() throws IgniteCheckedException { CacheDataStore delegate = init0(true); if (delegate != null) - return delegate.cursor(withTombstones); + return delegate.cursor(); return EMPTY_CURSOR; } /** {@inheritDoc} */ - @Override public GridCursor cursor(CacheDataRowAdapter.RowData x) throws IgniteCheckedException { + @Override public GridCursor cursor(Object x) throws IgniteCheckedException { CacheDataStore delegate = init0(true); if (delegate != null) @@ -2601,7 +2522,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple @Override public GridCursor cursor(int cacheId, KeyCacheObject lower, KeyCacheObject upper, - CacheDataRowAdapter.RowData x) + Object x) throws IgniteCheckedException { CacheDataStore delegate = init0(true); @@ -2615,15 +2536,13 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple @Override public GridCursor cursor(int cacheId, KeyCacheObject lower, KeyCacheObject upper, - CacheDataRowAdapter.RowData x, - MvccSnapshot mvccSnapshot, - boolean withTombstones - ) + Object x, + MvccSnapshot mvccSnapshot) throws IgniteCheckedException { CacheDataStore delegate = init0(true); if (delegate != null) - return delegate.cursor(cacheId, lower, upper, x, mvccSnapshot, withTombstones); + return delegate.cursor(cacheId, lower, upper, x, mvccSnapshot); return EMPTY_CURSOR; } @@ -2634,11 +2553,11 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } /** {@inheritDoc} */ - @Override public GridCursor cursor(int cacheId, boolean withTombstones) throws IgniteCheckedException { + @Override public GridCursor cursor(int cacheId) throws IgniteCheckedException { CacheDataStore delegate = init0(true); if (delegate != null) - return delegate.cursor(cacheId, withTombstones); + return delegate.cursor(cacheId); return EMPTY_CURSOR; } @@ -2849,25 +2768,9 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } } - /** {@inheritDoc} */ @Override public PartitionMetaStorage partStorage() { return partStorage; } - - /** {@inheritDoc} */ - @Override public long tombstonesCount() { - try { - CacheDataStore delegate0 = init0(true); - - if (delegate0 == null) - return 0; - - return delegate0.tombstonesCount(); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - } } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java index 48a7f20..a1a7913 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java @@ -17,8 +17,8 @@ package org.apache.ignite.internal.processors.cache.persistence; +import javax.management.InstanceNotFoundException; import java.io.File; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -27,7 +27,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import javax.management.InstanceNotFoundException; import org.apache.ignite.DataRegionMetrics; import org.apache.ignite.DataRegionMetricsProvider; import org.apache.ignite.DataStorageMetrics; @@ -49,16 +48,12 @@ import org.apache.ignite.internal.mem.IgniteOutOfMemoryException; import org.apache.ignite.internal.mem.file.MappedFileMemoryProvider; import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider; import org.apache.ignite.internal.pagemem.PageMemory; -import org.apache.ignite.internal.pagemem.PageUtils; import org.apache.ignite.internal.pagemem.impl.PageMemoryNoStoreImpl; import org.apache.ignite.internal.pagemem.wal.WALPointer; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheGroupContext; -import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; -import org.apache.ignite.internal.processors.cache.IncompleteCacheObject; -import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.persistence.evict.FairFifoPageEvictionTracker; import org.apache.ignite.internal.processors.cache.persistence.evict.NoOpPageEvictionTracker; @@ -141,6 +136,7 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap /** First eviction was warned flag. */ private volatile boolean firstEvictWarn; + /** {@inheritDoc} */ @Override protected void start0() throws IgniteCheckedException { if (cctx.kernalContext().clientNode() && cctx.kernalContext().config().getDataStorageConfiguration() == null) @@ -158,102 +154,6 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap } /** - * @param row Row. - * @return {@code True} if given row is tombstone. - * @throws IgniteCheckedException If failed. - */ - public boolean isTombstone(@Nullable CacheDataRow row) throws IgniteCheckedException { - if (row == null) - return false; - - CacheObject val = row.value(); - - assert val != null : row; - - return val.cacheObjectType() == CacheObject.TOMBSTONE; - } - - /** - * @param buf Buffer. - * @param key Row key. - * @param incomplete Incomplete object. - * @return Tombstone flag or {@code null} if there is no enough data. - */ - public Boolean isTombstone( - ByteBuffer buf, - @Nullable KeyCacheObject key, - @Nullable IncompleteCacheObject incomplete - ) { - if (key == null) { - if (incomplete == null) { // Did not start read key yet. - if (buf.remaining() < IncompleteCacheObject.HEAD_LEN) - return null; - - int keySize = buf.getInt(buf.position()); - - int headOffset = (IncompleteCacheObject.HEAD_LEN + keySize) /* key */ + - 8 /* expire time */; - - int requiredSize = headOffset + IncompleteCacheObject.HEAD_LEN; // Value header. - - if (buf.remaining() < requiredSize) - return null; - - return isTombstone(buf, headOffset); - } - else { // Reading key, check if there is enogh data to check value header. - byte[] data = incomplete.data(); - - if (data == null) // Header is not available yet. - return null; - - int keyRemaining = data.length - incomplete.dataOffset(); - - assert keyRemaining > 0 : keyRemaining; - - int headOffset = keyRemaining + 8 /* expire time */; - - int requiredSize = headOffset + IncompleteCacheObject.HEAD_LEN; // Value header. - - if (buf.remaining() < requiredSize) - return null; - - return isTombstone(buf, headOffset); - } - } - - if (incomplete == null) { // Did not start read value yet. - if (buf.remaining() < IncompleteCacheObject.HEAD_LEN) - return null; - - return isTombstone(buf, 0); - } - - return incomplete.type() == CacheObject.TOMBSTONE; - } - - /** - * @param buf Buffer. - * @param offset Value offset. - * @return Tombstone flag or {@code null} if there is no enough data. - */ - private Boolean isTombstone(ByteBuffer buf, int offset) { - byte valType = buf.get(buf.position() + offset + 4); - - return valType == CacheObject.TOMBSTONE; - } - - /** - * @param addr Row address. - * @return {@code True} if stored value is tombstone. - */ - public boolean isTombstone(long addr) { - byte type = PageUtils.getByte(addr, 4); - - return type == CacheObject.TOMBSTONE; - } - - /** * @param cfg Ignite configuration. * @param groupName Name of group. * @param dataRegionName Metrics MBean name. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIO.java index 931fd92..c86a64a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIO.java @@ -59,7 +59,7 @@ public class PagePartitionMetaIO extends PageMetaIO { setUpdateCounter(pageAddr, 0); setGlobalRemoveId(pageAddr, 0); setPartitionState(pageAddr, (byte)-1); - setSizesPageId(pageAddr, 0); + setCountersPageId(pageAddr, 0); } /** @@ -153,22 +153,22 @@ public class PagePartitionMetaIO extends PageMetaIO { } /** - * Returns page identifier related to page with logical cache sizes in cache group. + * Returns partition counters page identifier, page with caches in cache group sizes. * * @param pageAddr Partition metadata page address. * @return Next meta partial page ID or {@code 0} if it does not exist. */ - public long getCacheSizesPageId(long pageAddr) { + public long getCountersPageId(long pageAddr) { return PageUtils.getLong(pageAddr, NEXT_PART_META_PAGE_OFF); } /** - * Sets new reference to page with logical cache sizes in cache group. + * Sets new reference to partition counters page (logical cache sizes). * * @param pageAddr Partition metadata page address. * @param cntrsPageId New cache sizes page ID. */ - public void setSizesPageId(long pageAddr, long cntrsPageId) { + public void setCountersPageId(long pageAddr, long cntrsPageId) { PageUtils.putLong(pageAddr, NEXT_PART_META_PAGE_OFF, cntrsPageId); } @@ -228,23 +228,6 @@ public class PagePartitionMetaIO extends PageMetaIO { "this PagePartitionMetaIO version: ver=" + getVersion()); } - /** - * @param pageAddr Page address. - */ - public long getTombstonesCount(long pageAddr) { - throw new UnsupportedOperationException("Tombstones count is not supported by " + - "this PagePartitionMetaIO version: ver=" + getVersion()); - } - - /** - * @param pageAddr Page address. - * @param tombstonesCount Tombstones count. - */ - public boolean setTombstonesCount(long pageAddr, long tombstonesCount) { - throw new UnsupportedOperationException("Tombstones count is not supported by " + - "this PagePartitionMetaIO version: ver=" + getVersion()); - } - /** {@inheritDoc} */ @Override protected void printPage(long pageAddr, int pageSize, GridStringBuilder sb) throws IgniteCheckedException { super.printPage(pageAddr, pageSize, sb); @@ -255,7 +238,7 @@ public class PagePartitionMetaIO extends PageMetaIO { .a(",\n\tupdateCounter=").a(getUpdateCounter(pageAddr)) .a(",\n\tglobalRemoveId=").a(getGlobalRemoveId(pageAddr)) .a(",\n\tpartitionState=").a(state).a("(").a(GridDhtPartitionState.fromOrdinal(state)).a(")") - .a(",\n\tcacheSizesPageId=").a(getCacheSizesPageId(pageAddr)) + .a(",\n\tcountersPageId=").a(getCountersPageId(pageAddr)) .a("\n]"); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIOV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIOV2.java index e915e0b..37b7243 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIOV2.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIOV2.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.persistence.tree.io; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.pagemem.PageUtils; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; import org.apache.ignite.internal.util.GridStringBuilder; @@ -36,9 +37,6 @@ public class PagePartitionMetaIOV2 extends PagePartitionMetaIO { /** */ private static final int GAPS_LINK = PART_META_REUSE_LIST_ROOT_OFF + 8; - /** */ - private static final int TOMBSTONES_COUNT = GAPS_LINK + 8; - /** * @param ver Version. */ @@ -102,22 +100,7 @@ public class PagePartitionMetaIOV2 extends PagePartitionMetaIO { } /** {@inheritDoc} */ - @Override public long getTombstonesCount(long pageAddr) { - return PageUtils.getLong(pageAddr, TOMBSTONES_COUNT); - } - - /** {@inheritDoc} */ - @Override public boolean setTombstonesCount(long pageAddr, long tombstonesCnt) { - if (getTombstonesCount(pageAddr) == tombstonesCnt) - return false; - - PageUtils.putLong(pageAddr, TOMBSTONES_COUNT, tombstonesCnt); - - return true; - } - - /** {@inheritDoc} */ - @Override protected void printPage(long pageAddr, int pageSize, GridStringBuilder sb) { + @Override protected void printPage(long pageAddr, int pageSize, GridStringBuilder sb) throws IgniteCheckedException { byte state = getPartitionState(pageAddr); sb.a("PagePartitionMeta[\n\ttreeRoot=").a(getReuseListRoot(pageAddr)); @@ -132,9 +115,8 @@ public class PagePartitionMetaIOV2 extends PagePartitionMetaIO { sb.a(",\n\tupdateCounter=").a(getUpdateCounter(pageAddr)); sb.a(",\n\tglobalRemoveId=").a(getGlobalRemoveId(pageAddr)); sb.a(",\n\tpartitionState=").a(state).a("(").a(GridDhtPartitionState.fromOrdinal(state)).a(")"); - sb.a(",\n\tcacheSizesPageId=").a(getCacheSizesPageId(pageAddr)); + sb.a(",\n\tcountersPageId=").a(getCountersPageId(pageAddr)); sb.a(",\n\tcntrUpdDataPageId=").a(getGapsLink(pageAddr)); - sb.a(",\n\ttombstonesCount=").a(getTombstonesCount(pageAddr)); sb.a("\n]"); } @@ -151,6 +133,5 @@ public class PagePartitionMetaIOV2 extends PagePartitionMetaIO { setPendingTreeRoot(pageAddr, 0); setPartitionMetaStoreReuseListRoot(pageAddr, 0); setGapsLink(pageAddr, 0); - setTombstonesCount(pageAddr, 0); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java index 08e980f..ec68972 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java @@ -71,7 +71,6 @@ import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateLastSuc import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateNextSnapshotId; import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdatePartitionDataRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdatePartitionDataRecordV2; -import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdatePartitionDataRecordV3; import org.apache.ignite.internal.pagemem.wal.record.delta.NewRootInitRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.PageListMetaResetCountRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.PagesListAddPageRecord; @@ -380,10 +379,6 @@ public class RecordDataV1Serializer implements RecordDataSerializer { return /*cache ID*/4 + /*page ID*/8 + /*upd cntr*/8 + /*rmv id*/8 + /*part size*/4 + /*counters page id*/8 + /*state*/ 1 + /*allocatedIdxCandidate*/ 4 + /*link*/ 8; - case PARTITION_META_PAGE_UPDATE_COUNTERS_V3: - return /*cache ID*/4 + /*page ID*/8 + /*upd cntr*/8 + /*rmv id*/8 + /*part size*/4 + /*counters page id*/8 + /*state*/ 1 - + /*allocatedIdxCandidate*/ 4 + /*link*/ 8 + /*tombstones cnt*/ 8; - case MEMORY_RECOVERY: return 8; @@ -616,11 +611,6 @@ public class RecordDataV1Serializer implements RecordDataSerializer { break; - case PARTITION_META_PAGE_UPDATE_COUNTERS_V3: - res = new MetaPageUpdatePartitionDataRecordV3(in); - - break; - case MEMORY_RECOVERY: long ts = in.readLong(); @@ -1212,7 +1202,6 @@ public class RecordDataV1Serializer implements RecordDataSerializer { case PARTITION_META_PAGE_UPDATE_COUNTERS: case PARTITION_META_PAGE_UPDATE_COUNTERS_V2: - case PARTITION_META_PAGE_UPDATE_COUNTERS_V3: ((MetaPageUpdatePartitionDataRecord)rec).toBytes(buf); break; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java index 5eb50f8..add2abe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java @@ -120,8 +120,10 @@ public class DataRow extends CacheDataRowAdapter { this.link = link; } - /** {@inheritDoc} */ - @Override public void cacheId(int cacheId) { + /** + * @param cacheId Cache ID. + */ + public void cacheId(int cacheId) { this.cacheId = cacheId; } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/impl/MetricUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/impl/MetricUtils.java index d80e33b..fad16b5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/impl/MetricUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/impl/MetricUtils.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.metric.impl; import java.util.Map; -import org.apache.ignite.internal.processors.cache.CacheGroupMetricsImpl; import org.apache.ignite.internal.processors.metric.GridMetricManager; import org.apache.ignite.internal.processors.metric.MetricRegistry; import org.apache.ignite.internal.util.typedef.T2; @@ -87,14 +86,6 @@ public class MetricUtils { } /** - * @param cacheGrpName Cache group name. - * @return Cache group metrics registry name. - */ - public static String cacheGroupMetricsRegistryName(String cacheGrpName) { - return metricName(CacheGroupMetricsImpl.CACHE_GROUP_METRICS_PREFIX, cacheGrpName); - } - - /** * Atomically sets the value to the given updated value * if the current value {@code ==} the expected value. * diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDeferredDeleteSanitySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDeferredDeleteSanitySelfTest.java index 8c2bd1d..69a19f4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDeferredDeleteSanitySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDeferredDeleteSanitySelfTest.java @@ -51,10 +51,10 @@ public class CacheDeferredDeleteSanitySelfTest extends GridCommonAbstractTest { testDeferredDelete(LOCAL, TRANSACTIONAL, false, false); testDeferredDelete(PARTITIONED, ATOMIC, false, true); - testDeferredDelete(PARTITIONED, TRANSACTIONAL, false, false); + testDeferredDelete(PARTITIONED, TRANSACTIONAL, false, true); testDeferredDelete(REPLICATED, ATOMIC, false, true); - testDeferredDelete(REPLICATED, TRANSACTIONAL, false, false); + testDeferredDelete(REPLICATED, TRANSACTIONAL, false, true); // Near testDeferredDelete(LOCAL, ATOMIC, true, false); @@ -64,7 +64,7 @@ public class CacheDeferredDeleteSanitySelfTest extends GridCommonAbstractTest { testDeferredDelete(PARTITIONED, TRANSACTIONAL, true, false); testDeferredDelete(REPLICATED, ATOMIC, true, true); - testDeferredDelete(REPLICATED, TRANSACTIONAL, true, false); + testDeferredDelete(REPLICATED, TRANSACTIONAL, true, true); } /** diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java index e0ad965..3103f8d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@ -6718,12 +6718,9 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract GridCacheContext ctx = ((IgniteKernal)ignite).internalCache(DEFAULT_CACHE_NAME).context(); - if (ctx.isNear()) - ctx = ctx.near().dht().context(); + GridCacheEntryEx entry = ctx.isNear() ? ctx.near().dht().peekEx(key) : ctx.cache().peekEx(key); - GridCacheEntryEx entry = ctx.cache().peekEx(key); - - if (ctx.deferredDelete() && ignite.affinity(DEFAULT_CACHE_NAME).mapKeyToPrimaryAndBackups(key).contains(((IgniteKernal)ignite).localNode())) { + if (ignite.affinity(DEFAULT_CACHE_NAME).mapKeyToPrimaryAndBackups(key).contains(((IgniteKernal)ignite).localNode())) { assertNotNull(entry); assertTrue(entry.deleted()); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java index 51a8af5..b653d01 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java @@ -6623,7 +6623,7 @@ public class IgniteCacheConfigVariationsFullApiTest extends IgniteCacheConfigVar GridCacheEntryEx entry = ctx.isNear() ? ctx.near().dht().peekEx(key) : ctx.cache().peekEx(key); - if (ctx.deferredDelete() && ignite.affinity(cacheName).mapKeyToPrimaryAndBackups(key).contains(((IgniteKernal)ignite).localNode())) { + if (ignite.affinity(cacheName).mapKeyToPrimaryAndBackups(key).contains(((IgniteKernal)ignite).localNode())) { assertNotNull(entry); assertTrue(entry.deleted()); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesLoadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesLoadTest.java deleted file mode 100644 index d066153..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesLoadTest.java +++ /dev/null @@ -1,414 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCache; -import org.apache.ignite.IgniteSystemProperties; -import org.apache.ignite.Ignition; -import org.apache.ignite.cache.CacheWriteSynchronizationMode; -import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.DataRegionConfiguration; -import org.apache.ignite.configuration.DataStorageConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.configuration.WALMode; -import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.processors.metric.impl.MetricUtils; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.spi.metric.LongMetric; -import org.apache.ignite.testframework.GridTestUtils; -import org.apache.ignite.testframework.MvccFeatureChecker; -import org.apache.ignite.testframework.junits.WithSystemProperty; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.junit.After; -import org.junit.Assume; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; -import static org.apache.ignite.cache.CacheMode.PARTITIONED; -import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; - -/** - * - */ -@RunWith(Parameterized.class) -public class CacheRemoveWithTombstonesLoadTest extends GridCommonAbstractTest { - /** Dummy data. */ - private static final byte[] DUMMY_DATA = {}; - - /** Test parameters. */ - @Parameterized.Parameters(name = "persistenceEnabled={0}, historicalRebalance={1}") - public static Collection parameters() { - List res = new ArrayList<>(); - - for (boolean persistenceEnabled : new boolean[] {false, true}) { - for (boolean histRebalance : new boolean[] {false, true}) { - if (!persistenceEnabled && histRebalance) - continue; - - res.add(new Object[]{persistenceEnabled, histRebalance}); - } - } - - return res; - } - - /** */ - @Parameterized.Parameter(0) - public boolean persistence; - - /** */ - @Parameterized.Parameter(1) - public boolean histRebalance; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - cfg.setConsistentId(gridName); - - DataStorageConfiguration dsCfg = new DataStorageConfiguration(); - - if (persistence) { - dsCfg.setDefaultDataRegionConfiguration( - new DataRegionConfiguration() - .setInitialSize(256L * 1024 * 1024) - .setMaxSize(256L * 1024 * 1024) - .setPersistenceEnabled(true)) - .setWalMode(WALMode.LOG_ONLY); - } - - dsCfg.setPageSize(1024); - - cfg.setDataStorageConfiguration(dsCfg); - - // Throttle rebalance. - cfg.setRebalanceThrottle(100); - - return cfg; - } - - /** - * - */ - @BeforeClass - public static void beforeTests() { - Assume.assumeFalse(MvccFeatureChecker.forcedMvcc()); - } - - /** - * - */ - @Before - public void before() throws Exception { - cleanPersistenceDir(); - - stopAllGrids(); - - if (histRebalance) - System.setProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD, "0"); - } - - /** - * - */ - @After - public void after() throws Exception { - if (histRebalance) - System.clearProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD); - - stopAllGrids(); - - cleanPersistenceDir(); - } - - /** - * @throws Exception If failed. - */ - @Test - @WithSystemProperty(key = IgniteSystemProperties.IGNITE_BASELINE_AUTO_ADJUST_ENABLED, value = "false") - public void removeAndRebalance() throws Exception { - IgniteEx ignite0 = startGrid(0); - - IgniteCache cache0; - - final int ADD_NODES = persistence ? 2 : 3; - final int KEYS = persistence ? 5_000 : 10_000; - - if (persistence) { - // Preload initial data to all nodes to have start point for WAL rebalance. - for (int i = 0, idx = 1; i < ADD_NODES; i++, idx++) - startGrid(idx); - - ignite0.cluster().active(true); - - awaitPartitionMapExchange(); - - cache0 = ignite0.getOrCreateCache(cacheConfiguration()); - - for (int k = 0; k < KEYS; k++) - cache0.put(new TestKey(k, DUMMY_DATA), new TestValue(DUMMY_DATA)); - - forceCheckpoint(); - - for (int i = 0, idx = 1; i < ADD_NODES; i++, idx++) { - stopGrid(idx); - - awaitPartitionMapExchange(); - } - } - - final int pageSize = ignite0.configuration().getDataStorageConfiguration().getPageSize(); - - ThreadLocalRandom rnd = ThreadLocalRandom.current(); - - List keys = new ArrayList<>(); - - Map data = new HashMap<>(); - - for (int i = 0; i < KEYS; i++) { - TestKey key = new TestKey(i, new byte[rnd.nextInt(pageSize * 3)]); - - keys.add(key); - - data.put(key, new TestValue(new byte[rnd.nextInt(pageSize * 3)])); - } - - cache0 = ignite0.getOrCreateCache(cacheConfiguration()); - - cache0.putAll(data); - - AtomicInteger nodeIdx = new AtomicInteger(); - - for (int iter = 0; iter < ADD_NODES; iter++) { - IgniteInternalFuture nodeStartFut = GridTestUtils.runAsync(() -> { - int idx = nodeIdx.incrementAndGet(); - - info("Start node: " + idx); - - U.sleep(500); - - return startGrid(idx); - }); - - long endTime = U.currentTimeMillis() + 5_000; - - while (U.currentTimeMillis() < endTime) { - for (int i = 0; i < 100; i++) { - TestKey key = keys.get(rnd.nextInt(keys.size())); - - if (rnd.nextBoolean()) { - cache0.remove(key); - - data.remove(key); - } - else { - TestValue val = new TestValue(new byte[rnd.nextInt(pageSize * 3)]); - - cache0.put(key, val); - data.put(key, val); - } - - U.sleep(10); - } - } - - nodeStartFut.get(30_000); - - checkData(keys, data); - - waitTombstoneCleanup(); - - checkData(keys, data); - } - - awaitPartitionMapExchange(); - - for (int iter = 0; iter < ADD_NODES; iter++) { - IgniteInternalFuture nodeStopFut = GridTestUtils.runAsync(() -> { - int idx = nodeIdx.getAndDecrement(); - - info("Stop node: " + idx); - - stopGrid(idx); - - awaitPartitionMapExchange(); - - return null; - }); - - long endTime = U.currentTimeMillis() + 2_500; - - while (U.currentTimeMillis() < endTime) { - for (int i = 0; i < 100; i++) { - TestKey key = keys.get(rnd.nextInt(keys.size())); - - if (rnd.nextBoolean()) { - cache0.remove(key); - - data.remove(key); - } else { - TestValue val = new TestValue(new byte[rnd.nextInt(pageSize * 3)]); - - cache0.put(key, val); - data.put(key, val); - } - } - - U.sleep(10); - } - - nodeStopFut.get(30_000); - - checkData(keys, data); - - waitTombstoneCleanup(); - - checkData(keys, data); - } - } - - /** - * @param keys Keys to check. - * @param data Expected data. - */ - private void checkData(List keys, Map data) { - for (Ignite node : Ignition.allGrids()) { - info("Check node: " + node.name()); - - IgniteCache cache = node.cache(DEFAULT_CACHE_NAME); - - for (TestKey key : keys) { - TestValue expVal = data.get(key); - TestValue val = cache.get(key); - - if (expVal == null) - assertNull(val); - else { - assertNotNull(val); - assertTrue(Arrays.equals(expVal.dummyData, val.dummyData)); - } - } - } - } - - /** - * @throws Exception If failed. - */ - private void waitTombstoneCleanup() throws Exception { - for (Ignite node : Ignition.allGrids()) { - final LongMetric tombstones = ((IgniteEx)node).context().metric().registry( - MetricUtils.cacheGroupMetricsRegistryName(DEFAULT_CACHE_NAME)).findMetric("Tombstones"); - - GridTestUtils.waitForCondition(() -> tombstones.value() == 0, 30_000); - - assertEquals("Failed to wait for tombstone cleanup: " + node.name(), 0, tombstones.value()); - } - } - - /** - * @return Cache configuration. - */ - private CacheConfiguration cacheConfiguration() { - CacheConfiguration ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME); - - ccfg.setAtomicityMode(TRANSACTIONAL); - ccfg.setCacheMode(PARTITIONED); - ccfg.setBackups(2); - ccfg.setRebalanceMode(SYNC); - ccfg.setReadFromBackup(true); - ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - ccfg.setAffinity(new RendezvousAffinityFunction(false, 64)); - - return ccfg; - } - - /** - * - */ - static class TestKey { - /** */ - private final int id; - - /** */ - private final byte[] dummyData; - - /** - * @param id ID. - * @param dummyData Dummy byte array (to test with various key sizes). - */ - public TestKey(int id, byte[] dummyData) { - this.id = id; - this.dummyData = dummyData; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - - TestKey testKey = (TestKey) o; - - return id == testKey.id && Arrays.equals(dummyData, testKey.dummyData); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int result = Objects.hash(id); - result = 31 * result + Arrays.hashCode(dummyData); - return result; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return "TestKey [id=" + id + "]"; - } - } - - /** - * - */ - static class TestValue { - /** */ - private final byte[] dummyData; - - /** - * @param dummyData Dummy byte array (to test with various value sizes). - */ - public TestValue(byte[] dummyData) { - this.dummyData = dummyData; - } - } -} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesTest.java deleted file mode 100644 index c8b0a7f..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesTest.java +++ /dev/null @@ -1,289 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import org.apache.ignite.IgniteCache; -import org.apache.ignite.IgniteDataStreamer; -import org.apache.ignite.IgniteSystemProperties; -import org.apache.ignite.cache.CacheAtomicityMode; -import org.apache.ignite.cache.CacheWriteSynchronizationMode; -import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.DataRegionConfiguration; -import org.apache.ignite.configuration.DataStorageConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.configuration.WALMode; -import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.TestRecordingCommunicationSpi; -import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; -import org.apache.ignite.internal.processors.metric.impl.MetricUtils; -import org.apache.ignite.spi.metric.LongMetric; -import org.apache.ignite.testframework.GridTestUtils; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; -import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; -import static org.apache.ignite.cache.CacheMode.PARTITIONED; -import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC; - -/** - * - */ -@RunWith(Parameterized.class) -public class CacheRemoveWithTombstonesTest extends GridCommonAbstractTest { - /** Test parameters. */ - @Parameterized.Parameters(name = "persistenceEnabled={0}, historicalRebalance={1}") - public static Collection parameters() { - List res = new ArrayList<>(); - - for (boolean persistenceEnabled : new boolean[] {false, true}) { - for (boolean histRebalance : new boolean[] {false, true}) { - if (!persistenceEnabled && histRebalance) - continue; - - res.add(new Object[]{persistenceEnabled, histRebalance}); - } - } - - return res; - } - - /** */ - @Parameterized.Parameter(0) - public boolean persistence; - - /** */ - @Parameterized.Parameter(1) - public boolean histRebalance; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi(); - - cfg.setConsistentId(gridName); - - cfg.setCommunicationSpi(commSpi); - - if (persistence) { - DataStorageConfiguration dsCfg = new DataStorageConfiguration() - .setDefaultDataRegionConfiguration( - new DataRegionConfiguration() - .setInitialSize(256L * 1024 * 1024) - .setMaxSize(256L * 1024 * 1024) - .setPersistenceEnabled(true) - ) - .setWalMode(WALMode.LOG_ONLY); - - cfg.setDataStorageConfiguration(dsCfg); - } - - return cfg; - } - - /** - * - */ - @Before - public void before() throws Exception { - stopAllGrids(); - - cleanPersistenceDir(); - - if (histRebalance) - System.setProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD, "0"); - } - - /** - * - */ - @After - public void after() throws Exception { - if (histRebalance) - System.clearProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD); - - stopAllGrids(); - - cleanPersistenceDir(); - } - - /** - * @throws Exception If failed. - */ - @Test - public void testRemoveAndRebalanceRaceTx() throws Exception { - testRemoveAndRebalanceRace(TRANSACTIONAL, true); - } - - /** - * @throws Exception If failed. - */ - @Test - public void testRemoveAndRebalanceRaceAtomic() throws Exception { - testRemoveAndRebalanceRace(ATOMIC, false); - } - - /** - * @throws Exception If failed. - * @param expTombstone {@code True} if tombstones should be created. - */ - private void testRemoveAndRebalanceRace(CacheAtomicityMode atomicityMode, boolean expTombstone) throws Exception { - IgniteEx ignite0 = startGrid(0); - - if (histRebalance) - startGrid(1); - - if (persistence) - ignite0.cluster().active(true); - - IgniteCache cache0 = ignite0.createCache(cacheConfiguration(atomicityMode)); - - final int KEYS = histRebalance ? 1024 : 1024 * 256; - - if (histRebalance) { - // Preload initial data to have start point for WAL rebalance. - try (IgniteDataStreamer streamer = ignite0.dataStreamer(DEFAULT_CACHE_NAME)) { - streamer.allowOverwrite(true); - - for (int i = 0; i < KEYS; i++) - streamer.addData(-i, 0); - } - - forceCheckpoint(); - - stopGrid(1); - } - - // This data will be rebalanced. - try (IgniteDataStreamer streamer = ignite0.dataStreamer(DEFAULT_CACHE_NAME)) { - streamer.allowOverwrite(true); - - for (int i = 0; i < KEYS; i++) - streamer.addData(i, i); - } - - blockRebalance(ignite0); - - IgniteEx ignite1 = GridTestUtils.runAsync(() -> startGrid(1)).get(10, TimeUnit.SECONDS); - - if (persistence) { - ignite0.cluster().baselineAutoAdjustEnabled(false); - - ignite0.cluster().setBaselineTopology(2); - } - - TestRecordingCommunicationSpi.spi(ignite0).waitForBlocked(); - - Set keysWithTombstone = new HashSet<>(); - - // Do removes while rebalance is in progress. - // All keys are removed during historical rebalance. - for (int i = 0, step = histRebalance ? 1 : 64; i < KEYS; i += step) { - keysWithTombstone.add(i); - - cache0.remove(i); - } - - final LongMetric tombstoneMetric0 = ignite0.context().metric().registry( - MetricUtils.cacheGroupMetricsRegistryName(DEFAULT_CACHE_NAME)).findMetric("Tombstones"); - - final LongMetric tombstoneMetric1 = ignite1.context().metric().registry( - MetricUtils.cacheGroupMetricsRegistryName(DEFAULT_CACHE_NAME)).findMetric("Tombstones"); - - // On first node there should not be tombstones. - assertEquals(0, tombstoneMetric0.value()); - - if (expTombstone) - assertEquals(keysWithTombstone.size(), tombstoneMetric1.value()); - else - assertEquals(0, tombstoneMetric1.value()); - - // Update some of removed keys, this should remove tombstones. - for (int i = 0; i < KEYS; i += 128) { - keysWithTombstone.remove(i); - - cache0.put(i, i); - } - - assertTrue("Keys with tombstones should exist", !keysWithTombstone.isEmpty()); - - assertEquals(0, tombstoneMetric0.value()); - - if (expTombstone) - assertEquals(keysWithTombstone.size(), tombstoneMetric1.value()); - else - assertEquals(0, tombstoneMetric1.value()); - - TestRecordingCommunicationSpi.spi(ignite0).stopBlock(); - - awaitPartitionMapExchange(); - - IgniteCache cache1 = ignite(1).cache(DEFAULT_CACHE_NAME); - - for (int i = 0; i < KEYS; i++) { - if (keysWithTombstone.contains(i)) - assertNull(cache1.get(i)); - else - assertEquals((Object)i, cache1.get(i)); - } - - // Tombstones should be removed after once rebalance is completed. - GridTestUtils.waitForCondition(() -> tombstoneMetric1.value() == 0, 30_000); - - assertEquals(0, tombstoneMetric1.value()); - } - - /** - * - */ - private static void blockRebalance(IgniteEx node) { - final int grpId = groupIdForCache(node, DEFAULT_CACHE_NAME); - - TestRecordingCommunicationSpi.spi(node).blockMessages((node0, msg) -> - (msg instanceof GridDhtPartitionSupplyMessage) - && ((GridCacheGroupIdMessage)msg).groupId() == grpId - ); - } - - /** - * @param atomicityMode Cache atomicity mode. - * @return Cache configuration. - */ - private CacheConfiguration cacheConfiguration(CacheAtomicityMode atomicityMode) { - return new CacheConfiguration<>(DEFAULT_CACHE_NAME) - .setAtomicityMode(atomicityMode) - .setCacheMode(PARTITIONED) - .setBackups(2) - .setRebalanceMode(ASYNC) - .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC) - .setAffinity(new RendezvousAffinityFunction(false, 64)); - } -} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/CacheRemoveWithTombstonesFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/CacheRemoveWithTombstonesFailoverTest.java deleted file mode 100644 index db60ffc..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/CacheRemoveWithTombstonesFailoverTest.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.dht.topology; - -import java.util.HashSet; -import java.util.Set; -import org.apache.ignite.IgniteSystemProperties; -import org.apache.ignite.cache.CacheWriteSynchronizationMode; -import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.DataRegionConfiguration; -import org.apache.ignite.configuration.DataStorageConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.configuration.WALMode; -import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.TestRecordingCommunicationSpi; -import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; -import org.apache.ignite.internal.processors.metric.impl.MetricUtils; -import org.apache.ignite.spi.metric.LongMetric; -import org.apache.ignite.testframework.GridTestUtils; -import org.apache.ignite.testframework.junits.WithSystemProperty; -import org.junit.Assert; -import org.junit.Test; - -import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; -import static org.apache.ignite.cache.CacheMode.PARTITIONED; -import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC; - -/** - * Tests to check failover scenarios over tombstones. - */ -public class CacheRemoveWithTombstonesFailoverTest extends PartitionsEvictManagerAbstractTest { - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi(); - - cfg.setConsistentId(gridName); - - cfg.setCommunicationSpi(commSpi); - - DataStorageConfiguration dsCfg = new DataStorageConfiguration() - .setDefaultDataRegionConfiguration( - new DataRegionConfiguration() - .setInitialSize(256L * 1024 * 1024) - .setMaxSize(256L * 1024 * 1024) - .setPersistenceEnabled(true) - ) - .setCheckpointFrequency(1024 * 1024 * 1024) - .setWalMode(WALMode.LOG_ONLY); - - cfg.setDataStorageConfiguration(dsCfg); - - cfg.setCacheConfiguration(cacheConfiguration()); - - return cfg; - } - - /** - * Test check that tombstones reside in persistent partition will be cleared after node restart. - */ - @Test - @WithSystemProperty(key = IgniteSystemProperties.IGNITE_BASELINE_AUTO_ADJUST_ENABLED, value = "false") - public void testTombstonesClearedAfterRestart() throws Exception { - IgniteEx crd = startGrid(0); - - crd.cluster().active(true); - - final int KEYS = 1024; - - for (int k = 0; k < KEYS; k++) - crd.cache(DEFAULT_CACHE_NAME).put(k, k); - - blockRebalance(crd); - - IgniteEx node = startGrid(1); - - // Do not run clear tombsones task. - instrumentEvictionQueue(node, task -> { - if (task instanceof PartitionsEvictManager.ClearTombstonesTask) - return null; - - return task; - }); - - resetBaselineTopology(); - - TestRecordingCommunicationSpi.spi(crd).waitForBlocked(); - - Set keysWithTombstone = new HashSet<>(); - - // Do removes while rebalance is in progress. - for (int i = 0; i < KEYS; i += 2) { - keysWithTombstone.add(i); - - crd.cache(DEFAULT_CACHE_NAME).remove(i); - } - - final LongMetric tombstoneMetric = node.context().metric().registry( - MetricUtils.cacheGroupMetricsRegistryName(DEFAULT_CACHE_NAME)).findMetric("Tombstones"); - - Assert.assertEquals(keysWithTombstone.size(), tombstoneMetric.value()); - - // Resume rebalance. - TestRecordingCommunicationSpi.spi(crd).stopBlock(); - - // Partitions should be in OWNING state. - awaitPartitionMapExchange(); - - // But tombstones removal should be skipped. - Assert.assertEquals(keysWithTombstone.size(), tombstoneMetric.value()); - - // Stop node with tombstones. - stopGrid(1); - - // Stop coordinator. - stopGrid(0); - - // Startup node with tombstones in inactive state. - node = startGrid(1); - - final int grpId = groupIdForCache(node, DEFAULT_CACHE_NAME); - - // Tombstone metrics are unavailable before join to topology, using internal api. - long tombstonesBeforeActivation = node.context().cache().cacheGroup(grpId).topology().localPartitions() - .stream().map(part -> part.dataStore().tombstonesCount()).reduce(Long::sum).orElse(0L); - - Assert.assertEquals(keysWithTombstone.size(), tombstonesBeforeActivation); - - crd = startGrid(0); - - crd.cluster().active(true); - - awaitPartitionMapExchange(); - - final LongMetric tombstoneMetric1 = node.context().metric().registry( - MetricUtils.cacheGroupMetricsRegistryName(DEFAULT_CACHE_NAME)).findMetric("Tombstones"); - - // Tombstones should be removed after join to topology. - GridTestUtils.waitForCondition(() -> tombstoneMetric1.value() == 0, 30_000); - - assertEquals(0, tombstoneMetric1.value()); - } - - /** - * - */ - private static void blockRebalance(IgniteEx node) { - final int grpId = groupIdForCache(node, DEFAULT_CACHE_NAME); - - TestRecordingCommunicationSpi.spi(node).blockMessages((node0, msg) -> - (msg instanceof GridDhtPartitionSupplyMessage) - && ((GridCacheGroupIdMessage)msg).groupId() == grpId - ); - } - - /** - * @return Cache configuration. - */ - private CacheConfiguration cacheConfiguration() { - return new CacheConfiguration<>(DEFAULT_CACHE_NAME) - .setAtomicityMode(TRANSACTIONAL) - .setCacheMode(PARTITIONED) - .setBackups(1) - .setRebalanceMode(ASYNC) - .setReadFromBackup(true) - .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC) - .setAffinity(new RendezvousAffinityFunction(false, 64)); - } -} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/DropCacheContextDuringEvictionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/DropCacheContextDuringEvictionTest.java index 72a2853..4a7de04 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/DropCacheContextDuringEvictionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/DropCacheContextDuringEvictionTest.java @@ -39,15 +39,11 @@ public class DropCacheContextDuringEvictionTest extends PartitionsEvictManagerAb public void testDeactivation() throws Exception { T2 nodeAndEvictLatch = makeNodeWithEvictLatch(); - nodeAndEvictLatch.get1().createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME) + IgniteCache cache = nodeAndEvictLatch.get1().createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME) .setGroupName("test-grp")); - try (IgniteDataStreamer streamer = nodeAndEvictLatch.get1().dataStreamer(DEFAULT_CACHE_NAME)) { - streamer.allowOverwrite(true); - - for (int k = 0; k < 100_000; k++) - streamer.addData(k, k); - } + for (int i = 0; i < 100_000; i++) + cache.put(i, i); doActionDuringEviction(nodeAndEvictLatch, () -> nodeAndEvictLatch.get1().cluster().active(false)); @@ -64,19 +60,17 @@ public class DropCacheContextDuringEvictionTest extends PartitionsEvictManagerAb List caches = new ArrayList<>(); for (int idx = 0; idx < 10; idx++) { - String cacheName = DEFAULT_CACHE_NAME + idx; - - nodeAndEvictLatch.get1().createCache(new CacheConfiguration<>(cacheName) + IgniteCache cache = nodeAndEvictLatch.get1().createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME + idx) .setGroupName("test-grp")); - try (IgniteDataStreamer streamer = nodeAndEvictLatch.get1().dataStreamer(cacheName)) { + caches.add(cache.getName()); + + try (IgniteDataStreamer streamer = nodeAndEvictLatch.get1().dataStreamer(cache.getName())) { streamer.allowOverwrite(true); - for (int k = 0; k < 100_000; k++) - streamer.addData(k, k); + for (int i = 0; i < 200_000; i++) + streamer.addData(i, i); } - - caches.add(cacheName); } doActionDuringEviction(nodeAndEvictLatch, () -> nodeAndEvictLatch.get1().destroyCaches(caches)); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManagerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManagerAbstractTest.java index 233b25e..e49e07f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManagerAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManagerAbstractTest.java @@ -17,22 +17,26 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.topology; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; import java.util.Queue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.Nullable; import org.apache.ignite.Ignite; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.failure.AbstractFailureHandler; import org.apache.ignite.failure.FailureContext; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.jetbrains.annotations.NotNull; /** * @@ -45,7 +49,8 @@ public abstract class PartitionsEvictManagerAbstractTest extends GridCommonAbstr @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - cfg.setActiveOnStart(false); + cfg.setDataStorageConfiguration(new DataStorageConfiguration() + .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true))); cfg.setFailureHandler(new AbstractFailureHandler() { /** {@inheritDoc} */ @@ -62,16 +67,12 @@ public abstract class PartitionsEvictManagerAbstractTest extends GridCommonAbstr /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { - stopAllGrids(); - cleanPersistenceDir(); } /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { stopAllGrids(); - - cleanPersistenceDir(); } /** @@ -91,43 +92,40 @@ public abstract class PartitionsEvictManagerAbstractTest extends GridCommonAbstr protected void awaitEvictionQueueForFilling(IgniteEx node, int ms) throws IgniteInterruptedCheckedException { PartitionsEvictManager.BucketQueue evictionQueue = node.context().cache().context().evict().evictionQueue; - assertTrue(GridTestUtils.waitForCondition(() -> { - for (Queue queue : evictionQueue.buckets) - return ((InstrumentedEvictionQueue) queue).itemOffered; - - return false; - }, ms)); + assertTrue(GridTestUtils.waitForCondition(() -> !evictionQueue.isEmpty(), ms)); } /** * @param node Node. - * @param interceptor Interceptor that will be invoked after task from eviction has polled. + * @param latch Latch. + * @param completeWithError Inner future throws exception. */ - protected void instrumentEvictionQueue( - IgniteEx node, - IgniteClosure interceptor - ) { + protected void subscribeEvictionQueueAtLatch(IgniteEx node, CountDownLatch latch, boolean completeWithError) { PartitionsEvictManager.BucketQueue evictionQueue = node.context().cache().context().evict().evictionQueue; Queue[] buckets = evictionQueue.buckets; for (int i = 0; i < buckets.length; i++) - buckets[i] = new InstrumentedEvictionQueue(interceptor); + buckets[i] = new WaitingQueue(latch, completeWithError); } /** * */ protected T2 makeNodeWithEvictLatch() throws Exception { + return makeNodeWithEvictLatch(false); + } + + /** + * + */ + protected T2 makeNodeWithEvictLatch(boolean completeWithError) throws Exception { IgniteEx node1 = startGrid(0); - CountDownLatch latch = new CountDownLatch(1); + node1.cluster().baselineAutoAdjustEnabled(false); - instrumentEvictionQueue(node1, task -> { - U.awaitQuiet(latch); + CountDownLatch latch = new CountDownLatch(1); - return task; - }); + subscribeEvictionQueueAtLatch(node1, latch, completeWithError); node1.cluster().active(true); @@ -139,7 +137,11 @@ public abstract class PartitionsEvictManagerAbstractTest extends GridCommonAbstr * @param r R. */ protected void doActionDuringEviction(T2 nodeAndEvictLatch, Runnable r) throws Exception { - startGrid(1); + IgniteEx node2 = startGrid(1); + + awaitPartitionMapExchange(); + + nodeAndEvictLatch.get1().cluster().setBaselineTopology(node2.cluster().topologyVersion()); awaitEvictionQueueForFilling(nodeAndEvictLatch.get1(), 100_000); @@ -151,38 +153,55 @@ public abstract class PartitionsEvictManagerAbstractTest extends GridCommonAbstr } /** - * Queue that executes an interceptor during eviction task poll. + * Queue witch waits on the poll or breaks a PartitionEvictionTask. */ - private static class InstrumentedEvictionQueue extends LinkedBlockingQueue { - /** Interceptor. */ - private final IgniteClosure interceptor; + private class WaitingQueue extends LinkedBlockingQueue { + /** Latch. */ + private final CountDownLatch latch; - /** Empty indicator. */ - private volatile boolean itemOffered; + /** Complete with error. */ + private final boolean completeWithError; /** - * @param interceptor Interceptor. + * @param latch Latch. + * @param completeWithError flag. */ - private InstrumentedEvictionQueue(IgniteClosure interceptor - ) { - this.interceptor = interceptor; - } - - /** {@inheritDoc} */ - @Override public boolean offer(@NotNull Object o) { - itemOffered = true; - - return super.offer(o); + public WaitingQueue(CountDownLatch latch, boolean completeWithError) { + this.latch = latch; + this.completeWithError = completeWithError; } /** {@inheritDoc} */ @Override public Object poll() { + U.awaitQuiet(latch); + Object obj = super.poll(); - if (obj instanceof PartitionsEvictManager.AbstractEvictionTask) - return interceptor.apply((PartitionsEvictManager.AbstractEvictionTask) obj); + // This code uses for failure handler testing into PartitionEvictionTask. + if(obj != null && completeWithError) { + try { + Field field = U.findField(PartitionsEvictManager.PartitionEvictionTask.class, "finishFut"); + + field.setAccessible(true); + + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); + + field.set(obj, new GridFutureAdapter() { + @Override + protected boolean onDone(@Nullable Object res, @Nullable Throwable err, boolean cancel) { + if (err == null) + throw new RuntimeException("TEST"); + + return super.onDone(res, err, cancel); + } + }); + } + catch (Exception e) { + fail(); + } + } return obj; } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictionTaskFailureHandlerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictionTaskFailureHandlerTest.java index e78c61b..58c2460 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictionTaskFailureHandlerTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictionTaskFailureHandlerTest.java @@ -17,16 +17,11 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.topology; -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; -import java.util.concurrent.atomic.AtomicBoolean; -import javax.annotation.Nullable; -import org.apache.ignite.IgniteDataStreamer; +import java.util.concurrent.CountDownLatch; +import org.apache.ignite.IgniteCache; import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.testframework.GridTestUtils; import org.junit.Test; @@ -34,69 +29,20 @@ import org.junit.Test; * */ public class PartitionsEvictionTaskFailureHandlerTest extends PartitionsEvictManagerAbstractTest { - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - - cfg.setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME)); - - return cfg; - } - /** * */ @Test public void testEvictionTaskShouldCallFailureHandler() throws Exception { - IgniteEx node = startGrid(0); - - AtomicBoolean once = new AtomicBoolean(); - - // Partition eviction task should throw exception after completion. - instrumentEvictionQueue(node, task -> { - if (!(task instanceof PartitionsEvictManager.PartitionEvictionTask)) - return task; - - // Fail once. - if (!once.compareAndSet(false, true)) - return task; - - try { - Field field = U.findField(PartitionsEvictManager.PartitionEvictionTask.class, "finishFut"); - - field.setAccessible(true); - - Field modifiersField = Field.class.getDeclaredField("modifiers"); - modifiersField.setAccessible(true); - modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); - - field.set(task, new GridFutureAdapter() { - @Override protected boolean onDone(@Nullable Object res, @Nullable Throwable err, boolean cancel) { - if (err == null) - throw new RuntimeException("TEST"); - - return super.onDone(res, err, cancel); - } - }); - } - catch (Exception e) { - fail(); - } - - return task; - }); - - node.cluster().active(true); + T2 nodeAndEvictLatch = makeNodeWithEvictLatch(true); - try (IgniteDataStreamer streamer = node.dataStreamer(DEFAULT_CACHE_NAME)) { - streamer.allowOverwrite(true); + IgniteCache cache = nodeAndEvictLatch.get1().createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME) + .setGroupName("test-grp")); - for (int k = 0; k < 1024; k++) - node.cache(DEFAULT_CACHE_NAME).put(k, k); - } + for (int i = 0; i < 100_000; i++) + cache.put(i, i); - // Some partitions from node 0 should be evicted. - startGrid(1); + doActionDuringEviction(nodeAndEvictLatch, () -> {}); assertTrue(GridTestUtils.waitForCondition(() -> failure.get(), 10_000)); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java index 3ee9e95..e4a8c9b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java @@ -624,11 +624,6 @@ public class CacheFreeListSelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override public void cacheId(int cacheId) { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ @Override public long newMvccCoordinatorVersion() { return 0; } diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite9.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite9.java index 008d588..c1decfe 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite9.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite9.java @@ -22,31 +22,28 @@ import java.util.Collection; import java.util.HashSet; import java.util.List; import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.internal.metric.IoStatisticsCachePersistenceSelfTest; +import org.apache.ignite.internal.metric.IoStatisticsCacheSelfTest; import org.apache.ignite.internal.processors.cache.IgniteCacheGetCustomCollectionsSelfTest; import org.apache.ignite.internal.processors.cache.IgniteCacheLoadRebalanceEvictionSelfTest; import org.apache.ignite.internal.processors.cache.distributed.CacheAtomicPrimarySyncBackPressureTest; -import org.apache.ignite.internal.processors.cache.distributed.CacheRemoveWithTombstonesLoadTest; -import org.apache.ignite.internal.processors.cache.distributed.CacheRemoveWithTombstonesTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCachePrimarySyncTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteTxCachePrimarySyncTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteTxConcurrentRemoveObjectsTest; -import org.apache.ignite.internal.processors.cache.distributed.dht.topology.CacheRemoveWithTombstonesFailoverTest; import org.apache.ignite.internal.processors.cache.transactions.TxCrossCachePartitionConsistencyTest; import org.apache.ignite.internal.processors.cache.transactions.TxPartitionCounterStateConsistencyHistoryRebalanceTest; -import org.apache.ignite.internal.metric.IoStatisticsCachePersistenceSelfTest; -import org.apache.ignite.internal.metric.IoStatisticsCacheSelfTest; -import org.apache.ignite.testframework.junits.DynamicSuite; +import org.apache.ignite.internal.processors.cache.transactions.TxPartitionCounterStateConsistencyTest; import org.apache.ignite.internal.processors.cache.transactions.TxPartitionCounterStateConsistencyVolatileRebalanceTest; import org.apache.ignite.internal.processors.cache.transactions.TxPartitionCounterStateOnePrimaryOneBackupHistoryRebalanceTest; import org.apache.ignite.internal.processors.cache.transactions.TxPartitionCounterStateOnePrimaryOneBackupTest; +import org.apache.ignite.internal.processors.cache.transactions.TxPartitionCounterStateOnePrimaryTwoBackupsFailAllHistoryRebalanceTest; import org.apache.ignite.internal.processors.cache.transactions.TxPartitionCounterStateOnePrimaryTwoBackupsFailAllTest; import org.apache.ignite.internal.processors.cache.transactions.TxPartitionCounterStateOnePrimaryTwoBackupsHistoryRebalanceTest; -import org.apache.ignite.internal.processors.cache.transactions.TxPartitionCounterStateOnePrimaryTwoBackupsFailAllHistoryRebalanceTest; import org.apache.ignite.internal.processors.cache.transactions.TxPartitionCounterStateOnePrimaryTwoBackupsTest; import org.apache.ignite.internal.processors.cache.transactions.TxPartitionCounterStatePutTest; import org.apache.ignite.internal.processors.cache.transactions.TxPartitionCounterStateTwoPrimaryTwoBackupsTest; -import org.apache.ignite.internal.processors.cache.transactions.TxPartitionCounterStateConsistencyTest; import org.apache.ignite.internal.processors.cache.transactions.TxPartitionCounterStateWithFilterTest; +import org.apache.ignite.testframework.junits.DynamicSuite; import org.junit.runner.RunWith; /** @@ -95,11 +92,6 @@ public class IgniteCacheMvccTestSuite9 { ignoredTests.add(IoStatisticsCacheSelfTest.class); ignoredTests.add(IoStatisticsCachePersistenceSelfTest.class); - // Tombstones are not created with mvcc. - ignoredTests.add(CacheRemoveWithTombstonesTest.class); - ignoredTests.add(CacheRemoveWithTombstonesLoadTest.class); - ignoredTests.add(CacheRemoveWithTombstonesFailoverTest.class); - return new ArrayList<>(IgniteCacheTestSuite9.suite(ignoredTests)); } } diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java index 4b669ca..d4dcf48 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java @@ -36,14 +36,11 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheGetCustomCollectio import org.apache.ignite.internal.processors.cache.IgniteCacheLoadRebalanceEvictionSelfTest; import org.apache.ignite.internal.processors.cache.distributed.CacheAtomicPrimarySyncBackPressureTest; import org.apache.ignite.internal.processors.cache.distributed.CacheOperationsInterruptTest; -import org.apache.ignite.internal.processors.cache.distributed.CacheRemoveWithTombstonesLoadTest; -import org.apache.ignite.internal.processors.cache.distributed.CacheRemoveWithTombstonesTest; import org.apache.ignite.internal.processors.cache.distributed.FailBackupOnAtomicOperationTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCachePrimarySyncTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteTxCachePrimarySyncTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteTxCacheWriteSynchronizationModesMultithreadedTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteTxConcurrentRemoveObjectsTest; -import org.apache.ignite.internal.processors.cache.distributed.dht.topology.CacheRemoveWithTombstonesFailoverTest; import org.apache.ignite.internal.processors.cache.transactions.PartitionUpdateCounterTest; import org.apache.ignite.internal.processors.cache.transactions.TxCrossCachePartitionConsistencyTest; import org.apache.ignite.internal.processors.cache.transactions.TxDataConsistencyOnCommitFailureTest; @@ -129,10 +126,6 @@ public class IgniteCacheTestSuite9 { GridTestUtils.addTestIfNeeded(suite, FailBackupOnAtomicOperationTest.class, ignoredTests); - GridTestUtils.addTestIfNeeded(suite, CacheRemoveWithTombstonesTest.class, ignoredTests); - GridTestUtils.addTestIfNeeded(suite, CacheRemoveWithTombstonesLoadTest.class, ignoredTests); - GridTestUtils.addTestIfNeeded(suite, CacheRemoveWithTombstonesFailoverTest.class, ignoredTests); - return suite; } } diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java index db85a7b..f9ab6a4 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java @@ -129,7 +129,7 @@ public class H2PkHashIndex extends GridH2IndexBase { continue; if (filter == null || filter.applyPartition(part)) - cursors.add(store.cursor(cctx.cacheId(), lowerObj, upperObj, null, mvccSnapshot, false)); + cursors.add(store.cursor(cctx.cacheId(), lowerObj, upperObj, null, mvccSnapshot)); } return new H2PkHashIndexCursor(cursors.iterator()); @@ -209,7 +209,7 @@ public class H2PkHashIndex extends GridH2IndexBase { int part = store.partId(); if (partsFilter == null || partsFilter.applyPartition(part)) - cursors.add(store.cursor(cctx.cacheId(), false)); + cursors.add(store.cursor(cctx.cacheId())); } Cursor pkHashCursor = new H2PkHashIndexCursor(cursors.iterator()); diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/H2CacheRow.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/H2CacheRow.java index 4ac6303a..4bd584f 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/H2CacheRow.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/H2CacheRow.java @@ -231,11 +231,6 @@ public class H2CacheRow extends H2Row implements CacheDataRow { } /** {@inheritDoc} */ - @Override public void cacheId(int cacheId) { - row.cacheId(cacheId); - } - - /** {@inheritDoc} */ @Override public long mvccCoordinatorVersion() { return row.mvccCoordinatorVersion(); }