Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9A258200D0B for ; Wed, 27 Sep 2017 19:52:41 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 988C41609EB; Wed, 27 Sep 2017 17:52:41 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 466691609C1 for ; Wed, 27 Sep 2017 19:52:39 +0200 (CEST) Received: (qmail 28069 invoked by uid 500); 27 Sep 2017 17:52:38 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 28059 invoked by uid 99); 27 Sep 2017 17:52:38 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 27 Sep 2017 17:52:38 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4FEB7F571B; Wed, 27 Sep 2017 17:52:38 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: agura@apache.org To: commits@ignite.apache.org Date: Wed, 27 Sep 2017 17:52:38 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] ignite git commit: ignite-6029 IGNITE-6029 Record serializer refactoring and initial stuff for Record V2 serialization. archived-at: Wed, 27 Sep 2017 17:52:41 -0000 Repository: ignite Updated Branches: refs/heads/ignite-6029-master [created] 721df25fb http://git-wip-us.apache.org/repos/asf/ignite/blob/721df25f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java new file mode 100644 index 0000000..2b55c5f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal.serializer; + +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.pagemem.wal.record.WALRecord; +import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput; +import org.apache.ignite.internal.processors.cache.persistence.wal.RecordDataSerializer; +import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord; + +/** + * Record data V2 serializer. + */ +public class RecordDataV2Serializer implements RecordDataSerializer { + /** V1 data serializer delegate. */ + private final RecordDataV1Serializer delegateSerializer; + + /** + * Create an instance of V2 data serializer. + * + * @param delegateSerializer V1 data serializer. + */ + public RecordDataV2Serializer(RecordDataV1Serializer delegateSerializer) { + this.delegateSerializer = delegateSerializer; + } + + /** {@inheritDoc} */ + @Override public int size(WALRecord record) throws IgniteCheckedException { + if (record instanceof HeaderRecord) + throw new UnsupportedOperationException("Getting size of header records is forbidden since version 2 of serializer"); + + return delegateSerializer.size(record); + } + + /** {@inheritDoc} */ + @Override public WALRecord readRecord(WALRecord.RecordType type, ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException { + return delegateSerializer.readRecord(type, in); + } + + /** {@inheritDoc} */ + @Override public void writeRecord(WALRecord record, ByteBuffer buf) throws IgniteCheckedException { + if (record instanceof HeaderRecord) + throw new UnsupportedOperationException("Writing header records is forbidden since version 2 of serializer"); + + delegateSerializer.writeRecord(record, buf); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/721df25f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java index ce6fdc7..c4e1bf2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java @@ -21,84 +21,22 @@ import java.io.DataInput; import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteSystemProperties; -import org.apache.ignite.internal.pagemem.FullPageId; import org.apache.ignite.internal.pagemem.wal.WALPointer; -import org.apache.ignite.internal.pagemem.wal.record.CacheState; -import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord; -import org.apache.ignite.internal.pagemem.wal.record.DataEntry; -import org.apache.ignite.internal.pagemem.wal.record.DataRecord; -import org.apache.ignite.internal.pagemem.wal.record.LazyDataEntry; -import org.apache.ignite.internal.pagemem.wal.record.MemoryRecoveryRecord; -import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot; -import org.apache.ignite.internal.pagemem.wal.record.TxRecord; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType; -import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertFragmentRecord; -import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertRecord; -import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageRemoveRecord; -import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageSetFreeListPageRecord; -import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageUpdateRecord; -import org.apache.ignite.internal.pagemem.wal.record.delta.FixCountRecord; -import org.apache.ignite.internal.pagemem.wal.record.delta.FixLeftmostChildRecord; -import org.apache.ignite.internal.pagemem.wal.record.delta.FixRemoveId; -import org.apache.ignite.internal.pagemem.wal.record.delta.InitNewPageRecord; -import org.apache.ignite.internal.pagemem.wal.record.delta.InnerReplaceRecord; -import org.apache.ignite.internal.pagemem.wal.record.delta.InsertRecord; -import org.apache.ignite.internal.pagemem.wal.record.delta.MergeRecord; -import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageAddRootRecord; -import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageCutRootRecord; -import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageInitRecord; -import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageInitRootInlineRecord; -import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageInitRootRecord; -import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateLastAllocatedIndex; -import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateLastSuccessfulFullSnapshotId; -import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateLastSuccessfulSnapshotId; -import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateNextSnapshotId; -import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdatePartitionDataRecord; -import org.apache.ignite.internal.pagemem.wal.record.delta.NewRootInitRecord; -import org.apache.ignite.internal.pagemem.wal.record.delta.PageListMetaResetCountRecord; -import org.apache.ignite.internal.pagemem.wal.record.delta.PagesListAddPageRecord; -import org.apache.ignite.internal.pagemem.wal.record.delta.PagesListInitNewPageRecord; -import org.apache.ignite.internal.pagemem.wal.record.delta.PagesListRemovePageRecord; -import org.apache.ignite.internal.pagemem.wal.record.delta.PagesListSetNextRecord; -import org.apache.ignite.internal.pagemem.wal.record.delta.PagesListSetPreviousRecord; -import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionDestroyRecord; -import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord; -import org.apache.ignite.internal.pagemem.wal.record.delta.RecycleRecord; -import org.apache.ignite.internal.pagemem.wal.record.delta.RemoveRecord; -import org.apache.ignite.internal.pagemem.wal.record.delta.ReplaceRecord; -import org.apache.ignite.internal.pagemem.wal.record.delta.SplitExistingPageRecord; -import org.apache.ignite.internal.pagemem.wal.record.delta.SplitForwardPageRecord; -import org.apache.ignite.internal.pagemem.wal.record.delta.TrackingPageDeltaRecord; -import org.apache.ignite.internal.processors.cache.CacheObject; -import org.apache.ignite.internal.processors.cache.CacheObjectContext; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheOperation; -import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; -import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO; -import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInnerIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.CacheVersionIO; import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput; import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer; import org.apache.ignite.internal.processors.cache.persistence.wal.RecordSerializer; import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentEofException; +import org.apache.ignite.internal.processors.cache.persistence.wal.WalSegmentTailReachedException; import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32; -import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord; +import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.io.RecordIO; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.U; import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC; @@ -116,1328 +54,97 @@ public class RecordV1Serializer implements RecordSerializer { /** Length of Type */ public static final int REC_TYPE_SIZE = 1; - /** Length of WAL Pointer */ - public static final int FILE_WAL_POINTER_SIZE = 12; + /** Length of WAL Pointer: Index (8) + File offset (4). */ + public static final int FILE_WAL_POINTER_SIZE = 8 + 4; /** Length of CRC value */ - private static final int CRC_SIZE = 4; + public static final int CRC_SIZE = 4; - /** */ - public static final int HEADER_RECORD_SIZE = REC_TYPE_SIZE + FILE_WAL_POINTER_SIZE + /*Magic*/8 + /*Version*/4 + CRC_SIZE; + /** Total length of HEADER record. */ + public static final int HEADER_RECORD_SIZE = REC_TYPE_SIZE + FILE_WAL_POINTER_SIZE + CRC_SIZE + RecordDataV1Serializer.HEADER_RECORD_DATA_SIZE; - /** Cache shared context */ - private GridCacheSharedContext cctx; - - /** Size of page used for PageMemory regions */ - private int pageSize; - - /** Cache object processor to reading {@link DataEntry DataEntries} */ - private IgniteCacheObjectProcessor co; - - /** Skip CRC calculation/check flag */ - private boolean skipCrc = IgniteSystemProperties.getBoolean(IGNITE_PDS_SKIP_CRC, false); - - /** Write pointer. */ - private final boolean writePointer; - - /** Serializer of {@link TxRecord} records. */ - private TxRecordSerializer txRecordSerializer; - - /** - * @param cctx Cache shared context. - */ - public RecordV1Serializer(GridCacheSharedContext cctx) { - this(cctx, false); - } - - /** - * @param cctx Cache shared context. - */ - public RecordV1Serializer(GridCacheSharedContext cctx, boolean writePointer) { - this.cctx = cctx; - this.writePointer = writePointer; - - co = cctx.kernalContext().cacheObjects(); - pageSize = cctx.database().pageSize(); - txRecordSerializer = new TxRecordSerializer(cctx); - } - - /** {@inheritDoc} */ - @Override public int version() { - return 1; - } - - /** {@inheritDoc} */ - @Override public boolean writePointer() { - return writePointer; - } - - /** {@inheritDoc} */ - @SuppressWarnings("CastConflictsWithInstanceof") - @Override public void writeRecord(WALRecord record, ByteBuffer buf) throws IgniteCheckedException { - assert record.size() > 0 && buf.remaining() >= record.size() : record.size(); - - int startPos = buf.position(); - - buf.put((byte)(record.type().ordinal() + 1)); - - putPosition(buf, (FileWALPointer)record.position()); - - switch (record.type()) { - case PAGE_RECORD: - PageSnapshot snap = (PageSnapshot)record; - - buf.putInt(snap.fullPageId().groupId()); - buf.putLong(snap.fullPageId().pageId()); - buf.put(snap.pageData()); - - break; - - case MEMORY_RECOVERY: - MemoryRecoveryRecord memoryRecoveryRecord = (MemoryRecoveryRecord)record; - - buf.putLong(memoryRecoveryRecord.time()); - - break; - - case PARTITION_DESTROY: - PartitionDestroyRecord partDestroy = (PartitionDestroyRecord)record; - - buf.putInt(partDestroy.groupId()); - buf.putInt(partDestroy.partitionId()); - - break; - - case META_PAGE_INIT: - MetaPageInitRecord updRootsRec = (MetaPageInitRecord)record; - - buf.putInt(updRootsRec.groupId()); - buf.putLong(updRootsRec.pageId()); - - buf.putShort((short)updRootsRec.ioType()); - buf.putShort((short)updRootsRec.ioVersion()); - buf.putLong(updRootsRec.treeRoot()); - buf.putLong(updRootsRec.reuseListRoot()); - - break; - - case PARTITION_META_PAGE_UPDATE_COUNTERS: - MetaPageUpdatePartitionDataRecord partDataRec = (MetaPageUpdatePartitionDataRecord)record; - - buf.putInt(partDataRec.groupId()); - buf.putLong(partDataRec.pageId()); - - buf.putLong(partDataRec.updateCounter()); - buf.putLong(partDataRec.globalRemoveId()); - buf.putInt(partDataRec.partitionSize()); - buf.putLong(partDataRec.countersPageId()); - buf.put(partDataRec.state()); - buf.putInt(partDataRec.allocatedIndexCandidate()); - - break; - - case CHECKPOINT_RECORD: - CheckpointRecord cpRec = (CheckpointRecord)record; - - assert cpRec.checkpointMark() == null || cpRec.checkpointMark() instanceof FileWALPointer : - "Invalid WAL record: " + cpRec; - - FileWALPointer walPtr = (FileWALPointer)cpRec.checkpointMark(); - UUID cpId = cpRec.checkpointId(); - - buf.putLong(cpId.getMostSignificantBits()); - buf.putLong(cpId.getLeastSignificantBits()); - - buf.put(walPtr == null ? (byte)0 : 1); - - if (walPtr != null) { - buf.putLong(walPtr.index()); - buf.putInt(walPtr.fileOffset()); - buf.putInt(walPtr.length()); - } - - putCacheStates(buf, cpRec.cacheGroupStates()); - - buf.put(cpRec.end() ? (byte)1 : 0); - - break; - - case DATA_RECORD: - DataRecord dataRec = (DataRecord)record; - - buf.putInt(dataRec.writeEntries().size()); - - for (DataEntry dataEntry : dataRec.writeEntries()) - putDataEntry(buf, dataEntry); - - break; - - case HEADER_RECORD: - buf.putLong(HeaderRecord.MAGIC); - - buf.putInt(((HeaderRecord)record).version()); - - break; - - case DATA_PAGE_INSERT_RECORD: - DataPageInsertRecord diRec = (DataPageInsertRecord)record; - - buf.putInt(diRec.groupId()); - buf.putLong(diRec.pageId()); - - buf.putShort((short)diRec.payload().length); - - buf.put(diRec.payload()); - - break; - - case DATA_PAGE_UPDATE_RECORD: - DataPageUpdateRecord uRec = (DataPageUpdateRecord)record; - - buf.putInt(uRec.groupId()); - buf.putLong(uRec.pageId()); - buf.putInt(uRec.itemId()); - - buf.putShort((short)uRec.payload().length); - - buf.put(uRec.payload()); - - break; - - case DATA_PAGE_INSERT_FRAGMENT_RECORD: - final DataPageInsertFragmentRecord difRec = (DataPageInsertFragmentRecord)record; - - buf.putInt(difRec.groupId()); - buf.putLong(difRec.pageId()); - - buf.putLong(difRec.lastLink()); - buf.putInt(difRec.payloadSize()); - buf.put(difRec.payload()); - - break; - - case DATA_PAGE_REMOVE_RECORD: - DataPageRemoveRecord drRec = (DataPageRemoveRecord)record; - - buf.putInt(drRec.groupId()); - buf.putLong(drRec.pageId()); - - buf.put((byte)drRec.itemId()); - - break; - - case DATA_PAGE_SET_FREE_LIST_PAGE: - DataPageSetFreeListPageRecord freeListRec = (DataPageSetFreeListPageRecord)record; - - buf.putInt(freeListRec.groupId()); - buf.putLong(freeListRec.pageId()); - - buf.putLong(freeListRec.freeListPage()); - - break; - - case INIT_NEW_PAGE_RECORD: - InitNewPageRecord inpRec = (InitNewPageRecord)record; - - buf.putInt(inpRec.groupId()); - buf.putLong(inpRec.pageId()); - - buf.putShort((short)inpRec.ioType()); - buf.putShort((short)inpRec.ioVersion()); - buf.putLong(inpRec.newPageId()); - - break; - - case BTREE_META_PAGE_INIT_ROOT: - MetaPageInitRootRecord imRec = (MetaPageInitRootRecord)record; - - buf.putInt(imRec.groupId()); - buf.putLong(imRec.pageId()); - - buf.putLong(imRec.rootId()); - - break; - - case BTREE_META_PAGE_INIT_ROOT2: - MetaPageInitRootInlineRecord imRec2 = (MetaPageInitRootInlineRecord)record; - - buf.putInt(imRec2.groupId()); - buf.putLong(imRec2.pageId()); - - buf.putLong(imRec2.rootId()); - - buf.putShort((short)imRec2.inlineSize()); - break; - - case BTREE_META_PAGE_ADD_ROOT: - MetaPageAddRootRecord arRec = (MetaPageAddRootRecord)record; - - buf.putInt(arRec.groupId()); - buf.putLong(arRec.pageId()); - - buf.putLong(arRec.rootId()); - - break; - - case BTREE_META_PAGE_CUT_ROOT: - MetaPageCutRootRecord crRec = (MetaPageCutRootRecord)record; - - buf.putInt(crRec.groupId()); - buf.putLong(crRec.pageId()); - - break; - - case BTREE_INIT_NEW_ROOT: - NewRootInitRecord riRec = (NewRootInitRecord)record; - - buf.putInt(riRec.groupId()); - buf.putLong(riRec.pageId()); - - buf.putLong(riRec.rootId()); - buf.putShort((short)riRec.io().getType()); - buf.putShort((short)riRec.io().getVersion()); - buf.putLong(riRec.leftId()); - buf.putLong(riRec.rightId()); - - putRow(buf, riRec.rowBytes()); - - break; - - case BTREE_PAGE_RECYCLE: - RecycleRecord recRec = (RecycleRecord)record; - - buf.putInt(recRec.groupId()); - buf.putLong(recRec.pageId()); - - buf.putLong(recRec.newPageId()); - - break; - - case BTREE_PAGE_INSERT: - InsertRecord inRec = (InsertRecord)record; - - buf.putInt(inRec.groupId()); - buf.putLong(inRec.pageId()); - - buf.putShort((short)inRec.io().getType()); - buf.putShort((short)inRec.io().getVersion()); - buf.putShort((short)inRec.index()); - buf.putLong(inRec.rightId()); - - putRow(buf, inRec.rowBytes()); - - break; - - case BTREE_FIX_LEFTMOST_CHILD: - FixLeftmostChildRecord flRec = (FixLeftmostChildRecord)record; - - buf.putInt(flRec.groupId()); - buf.putLong(flRec.pageId()); - - buf.putLong(flRec.rightId()); - - break; - - case BTREE_FIX_COUNT: - FixCountRecord fcRec = (FixCountRecord)record; - - buf.putInt(fcRec.groupId()); - buf.putLong(fcRec.pageId()); - - buf.putShort((short)fcRec.count()); - - break; - - case BTREE_PAGE_REPLACE: - ReplaceRecord rRec = (ReplaceRecord)record; - - buf.putInt(rRec.groupId()); - buf.putLong(rRec.pageId()); - - buf.putShort((short)rRec.io().getType()); - buf.putShort((short)rRec.io().getVersion()); - buf.putShort((short)rRec.index()); - - putRow(buf, rRec.rowBytes()); - - break; - - case BTREE_PAGE_REMOVE: - RemoveRecord rmRec = (RemoveRecord)record; - - buf.putInt(rmRec.groupId()); - buf.putLong(rmRec.pageId()); - - buf.putShort((short)rmRec.index()); - buf.putShort((short)rmRec.count()); - - break; - - case BTREE_PAGE_INNER_REPLACE: - InnerReplaceRecord irRec = (InnerReplaceRecord)record; - - buf.putInt(irRec.groupId()); - buf.putLong(irRec.pageId()); - - buf.putShort((short)irRec.destinationIndex()); - buf.putLong(irRec.sourcePageId()); - buf.putShort((short)irRec.sourceIndex()); - buf.putLong(irRec.removeId()); - - break; - - case BTREE_FORWARD_PAGE_SPLIT: - SplitForwardPageRecord sfRec = (SplitForwardPageRecord)record; - - buf.putInt(sfRec.groupId()); - buf.putLong(sfRec.pageId()); - - buf.putLong(sfRec.forwardId()); - buf.putShort((short)sfRec.ioType()); - buf.putShort((short)sfRec.ioVersion()); - buf.putLong(sfRec.sourcePageId()); - buf.putShort((short)sfRec.middleIndex()); - buf.putShort((short)sfRec.count()); - - break; - - case BTREE_EXISTING_PAGE_SPLIT: - SplitExistingPageRecord seRec = (SplitExistingPageRecord)record; - - buf.putInt(seRec.groupId()); - buf.putLong(seRec.pageId()); - - buf.putShort((short)seRec.middleIndex()); - buf.putLong(seRec.forwardId()); - - break; - - case BTREE_PAGE_MERGE: - MergeRecord mRec = (MergeRecord)record; - - buf.putInt(mRec.groupId()); - buf.putLong(mRec.pageId()); - - buf.putLong(mRec.parentId()); - buf.putShort((short)mRec.parentIndex()); - buf.putLong(mRec.rightId()); - buf.put((byte)(mRec.isEmptyBranch() ? 1 : 0)); - - break; - - case PAGES_LIST_SET_NEXT: - PagesListSetNextRecord plNextRec = (PagesListSetNextRecord)record; - - buf.putInt(plNextRec.groupId()); - buf.putLong(plNextRec.pageId()); - - buf.putLong(plNextRec.nextPageId()); - - break; - - case PAGES_LIST_SET_PREVIOUS: - PagesListSetPreviousRecord plPrevRec = (PagesListSetPreviousRecord)record; - - buf.putInt(plPrevRec.groupId()); - buf.putLong(plPrevRec.pageId()); - - buf.putLong(plPrevRec.previousPageId()); - - break; - - case PAGES_LIST_INIT_NEW_PAGE: - PagesListInitNewPageRecord plNewRec = (PagesListInitNewPageRecord)record; - - buf.putInt(plNewRec.groupId()); - buf.putLong(plNewRec.pageId()); - buf.putInt(plNewRec.ioType()); - buf.putInt(plNewRec.ioVersion()); - buf.putLong(plNewRec.newPageId()); - - buf.putLong(plNewRec.previousPageId()); - buf.putLong(plNewRec.dataPageId()); - - break; - - case PAGES_LIST_ADD_PAGE: - PagesListAddPageRecord plAddRec = (PagesListAddPageRecord)record; - - buf.putInt(plAddRec.groupId()); - buf.putLong(plAddRec.pageId()); - - buf.putLong(plAddRec.dataPageId()); - - break; - - case PAGES_LIST_REMOVE_PAGE: - PagesListRemovePageRecord plRmvRec = (PagesListRemovePageRecord)record; - - buf.putInt(plRmvRec.groupId()); - buf.putLong(plRmvRec.pageId()); - - buf.putLong(plRmvRec.removedPageId()); - - break; - - case BTREE_FIX_REMOVE_ID: - FixRemoveId frRec = (FixRemoveId)record; - - buf.putInt(frRec.groupId()); - buf.putLong(frRec.pageId()); - - buf.putLong(frRec.removeId()); - - break; - - case TRACKING_PAGE_DELTA: - TrackingPageDeltaRecord tpDelta = (TrackingPageDeltaRecord)record; - - buf.putInt(tpDelta.groupId()); - buf.putLong(tpDelta.pageId()); - - buf.putLong(tpDelta.pageIdToMark()); - buf.putLong(tpDelta.nextSnapshotId()); - buf.putLong(tpDelta.lastSuccessfulSnapshotId()); - - break; - - case META_PAGE_UPDATE_NEXT_SNAPSHOT_ID: - MetaPageUpdateNextSnapshotId mpUpdateNextSnapshotId = (MetaPageUpdateNextSnapshotId)record; - - buf.putInt(mpUpdateNextSnapshotId.groupId()); - buf.putLong(mpUpdateNextSnapshotId.pageId()); - - buf.putLong(mpUpdateNextSnapshotId.nextSnapshotId()); - - break; - - case META_PAGE_UPDATE_LAST_SUCCESSFUL_FULL_SNAPSHOT_ID: - MetaPageUpdateLastSuccessfulFullSnapshotId mpUpdateLastSuccFullSnapshotId = - (MetaPageUpdateLastSuccessfulFullSnapshotId)record; - - buf.putInt(mpUpdateLastSuccFullSnapshotId.groupId()); - buf.putLong(mpUpdateLastSuccFullSnapshotId.pageId()); - - buf.putLong(mpUpdateLastSuccFullSnapshotId.lastSuccessfulFullSnapshotId()); - - break; - - case META_PAGE_UPDATE_LAST_SUCCESSFUL_SNAPSHOT_ID: - MetaPageUpdateLastSuccessfulSnapshotId mpUpdateLastSuccSnapshotId = - (MetaPageUpdateLastSuccessfulSnapshotId)record; - - buf.putInt(mpUpdateLastSuccSnapshotId.groupId()); - buf.putLong(mpUpdateLastSuccSnapshotId.pageId()); - - buf.putLong(mpUpdateLastSuccSnapshotId.lastSuccessfulSnapshotId()); - buf.putLong(mpUpdateLastSuccSnapshotId.lastSuccessfulSnapshotTag()); - - break; - - case META_PAGE_UPDATE_LAST_ALLOCATED_INDEX: - MetaPageUpdateLastAllocatedIndex mpUpdateLastAllocatedIdx = - (MetaPageUpdateLastAllocatedIndex) record; - - buf.putInt(mpUpdateLastAllocatedIdx.groupId()); - buf.putLong(mpUpdateLastAllocatedIdx.pageId()); - - buf.putInt(mpUpdateLastAllocatedIdx.lastAllocatedIndex()); - - break; - - case PART_META_UPDATE_STATE: - PartitionMetaStateRecord partMetaStateRecord = (PartitionMetaStateRecord) record; - - buf.putInt(partMetaStateRecord.groupId()); - - buf.putInt(partMetaStateRecord.partitionId()); - - buf.put(partMetaStateRecord.state()); - - buf.putLong(partMetaStateRecord.updateCounter()); - - break; - - case PAGE_LIST_META_RESET_COUNT_RECORD: - PageListMetaResetCountRecord pageListMetaResetCntRecord = (PageListMetaResetCountRecord) record; - - buf.putInt(pageListMetaResetCntRecord.groupId()); - buf.putLong(pageListMetaResetCntRecord.pageId()); - - break; - - case TX_RECORD: - txRecordSerializer.writeTxRecord((TxRecord) record, buf); - - break; - - default: - throw new UnsupportedOperationException("Type: " + record.type()); - } - - if (!skipCrc) { - int curPos = buf.position(); - - buf.position(startPos); - - // This call will move buffer position to the end of the record again. - int crcVal = PureJavaCrc32.calcCrc32(buf, curPos - startPos); - - buf.putInt(crcVal); - } - else - buf.putInt(0); - } - - /** {@inheritDoc} */ - @Override public WALRecord readRecord(FileInput in0, WALPointer expPtr) throws IOException, IgniteCheckedException { - long startPos = -1; - - try (FileInput.Crc32CheckingFileInput in = in0.startRead(skipCrc)) { - startPos = in0.position(); - - WALRecord res = readRecord(in, expPtr); - - assert res != null; - - int size = (int) (in0.position() - startPos + CRC_SIZE); - - res.size(size); // Account for CRC which will be read afterwards. - - if (writePointer && expPtr instanceof FileWALPointer) { - FileWALPointer ptr = (FileWALPointer) expPtr; - - res.position(new FileWALPointer(ptr.index(), ptr.fileOffset(), size)); - } - - return res; - } - catch (EOFException | SegmentEofException e) { - throw e; - } - catch (Exception e) { - throw new IgniteCheckedException("Failed to read WAL record at position: " + startPos, e); - } - } - - /** - * Loads record from input, does not read CRC value - * - * @param in Input to read record from - * @param expPtr expected WAL pointer for record. Used to validate actual position against expected from the file - * @throws SegmentEofException if end of WAL segment reached - */ - private WALRecord readRecord(ByteBufferBackedDataInput in, WALPointer expPtr) throws IOException, IgniteCheckedException { - int type = in.readUnsignedByte(); - - if (type == WALRecord.RecordType.STOP_ITERATION_RECORD_TYPE) - throw new SegmentEofException("Reached logical end of the segment", null); - - FileWALPointer ptr = readPosition(in); - - if (!F.eq(ptr, expPtr)) - throw new SegmentEofException("WAL segment rollover detected (will end iteration) [expPtr=" + expPtr + - ", readPtr=" + ptr + ']', null); - - RecordType recType = RecordType.fromOrdinal(type - 1); - - if (recType == null) - throw new IOException("Unknown record type: " + type); - - WALRecord res; - - switch (recType) { - case PAGE_RECORD: - byte[] arr = new byte[pageSize]; - - int cacheId = in.readInt(); - long pageId = in.readLong(); - - in.readFully(arr); - - res = new PageSnapshot(new FullPageId(pageId, cacheId), arr); - - break; - - case CHECKPOINT_RECORD: - long msb = in.readLong(); - long lsb = in.readLong(); - boolean hasPtr = in.readByte() != 0; - int idx = hasPtr ? in.readInt() : 0; - int offset = hasPtr ? in.readInt() : 0; - int len = hasPtr ? in.readInt() : 0; - - Map states = readPartitionStates(in); - - boolean end = in.readByte() != 0; - - FileWALPointer walPtr = hasPtr ? new FileWALPointer(idx, offset, len) : null; - - CheckpointRecord cpRec = new CheckpointRecord(new UUID(msb, lsb), walPtr, end); - - cpRec.cacheGroupStates(states); - - res = cpRec; - - break; - - case META_PAGE_INIT: - cacheId = in.readInt(); - pageId = in.readLong(); - - int ioType = in.readUnsignedShort(); - int ioVer = in.readUnsignedShort(); - long treeRoot = in.readLong(); - long reuseListRoot = in.readLong(); - - res = new MetaPageInitRecord(cacheId, pageId, ioType, ioVer, treeRoot, reuseListRoot); - - break; - - case PARTITION_META_PAGE_UPDATE_COUNTERS: - cacheId = in.readInt(); - pageId = in.readLong(); - - long updCntr = in.readLong(); - long rmvId = in.readLong(); - int partSize = in.readInt(); - long countersPageId = in.readLong(); - byte state = in.readByte(); - int allocatedIdxCandidate = in.readInt(); - - res = new MetaPageUpdatePartitionDataRecord(cacheId, pageId, updCntr, rmvId, partSize, countersPageId, state, allocatedIdxCandidate); - - break; - - case MEMORY_RECOVERY: - long ts = in.readLong(); - - res = new MemoryRecoveryRecord(ts); - - break; - - case PARTITION_DESTROY: - cacheId = in.readInt(); - int partId = in.readInt(); - - res = new PartitionDestroyRecord(cacheId, partId); - - break; - - case DATA_RECORD: - int entryCnt = in.readInt(); - - List entries = new ArrayList<>(entryCnt); - - for (int i = 0; i < entryCnt; i++) - entries.add(readDataEntry(in)); - - res = new DataRecord(entries); - - break; - - case HEADER_RECORD: - long magic = in.readLong(); - - if (magic != HeaderRecord.MAGIC) - throw new EOFException("Magic is corrupted [exp=" + U.hexLong(HeaderRecord.MAGIC) + - ", actual=" + U.hexLong(magic) + ']'); - - int ver = in.readInt(); - - res = new HeaderRecord(ver); - - break; - - case DATA_PAGE_INSERT_RECORD: { - cacheId = in.readInt(); - pageId = in.readLong(); - - int size = in.readUnsignedShort(); - - in.ensure(size); - - byte[] payload = new byte[size]; - - in.readFully(payload); - - res = new DataPageInsertRecord(cacheId, pageId, payload); - - break; - } - - case DATA_PAGE_UPDATE_RECORD: { - cacheId = in.readInt(); - pageId = in.readLong(); - - int itemId = in.readInt(); - - int size = in.readUnsignedShort(); - - in.ensure(size); - - byte[] payload = new byte[size]; - - in.readFully(payload); - - res = new DataPageUpdateRecord(cacheId, pageId, itemId, payload); - - break; - } - - case DATA_PAGE_INSERT_FRAGMENT_RECORD: { - cacheId = in.readInt(); - pageId = in.readLong(); - - final long lastLink = in.readLong(); - final int payloadSize = in.readInt(); - - final byte[] payload = new byte[payloadSize]; - - in.readFully(payload); - - res = new DataPageInsertFragmentRecord(cacheId, pageId, payload, lastLink); - - break; - } - - case DATA_PAGE_REMOVE_RECORD: - cacheId = in.readInt(); - pageId = in.readLong(); - - int itemId = in.readUnsignedByte(); - - res = new DataPageRemoveRecord(cacheId, pageId, itemId); - - break; - - case DATA_PAGE_SET_FREE_LIST_PAGE: - cacheId = in.readInt(); - pageId = in.readLong(); - - long freeListPage = in.readLong(); - - res = new DataPageSetFreeListPageRecord(cacheId, pageId, freeListPage); - - break; - - case INIT_NEW_PAGE_RECORD: - cacheId = in.readInt(); - pageId = in.readLong(); - - ioType = in.readUnsignedShort(); - ioVer = in.readUnsignedShort(); - long virtualPageId = in.readLong(); - - res = new InitNewPageRecord(cacheId, pageId, ioType, ioVer, virtualPageId); - - break; - - case BTREE_META_PAGE_INIT_ROOT: - cacheId = in.readInt(); - pageId = in.readLong(); - - long rootId = in.readLong(); - - res = new MetaPageInitRootRecord(cacheId, pageId, rootId); - - break; - - case BTREE_META_PAGE_INIT_ROOT2: - cacheId = in.readInt(); - pageId = in.readLong(); - - long rootId2 = in.readLong(); - int inlineSize = in.readShort(); - - res = new MetaPageInitRootInlineRecord(cacheId, pageId, rootId2, inlineSize); - - break; - - case BTREE_META_PAGE_ADD_ROOT: - cacheId = in.readInt(); - pageId = in.readLong(); - - rootId = in.readLong(); - - res = new MetaPageAddRootRecord(cacheId, pageId, rootId); - - break; - - case BTREE_META_PAGE_CUT_ROOT: - cacheId = in.readInt(); - pageId = in.readLong(); - - res = new MetaPageCutRootRecord(cacheId, pageId); - - break; - - case BTREE_INIT_NEW_ROOT: - cacheId = in.readInt(); - pageId = in.readLong(); - - rootId = in.readLong(); - ioType = in.readUnsignedShort(); - ioVer = in.readUnsignedShort(); - long leftId = in.readLong(); - long rightId = in.readLong(); - - BPlusIO io = BPlusIO.getBPlusIO(ioType, ioVer); - - byte[] rowBytes = new byte[io.getItemSize()]; - - in.readFully(rowBytes); - - res = new NewRootInitRecord<>(cacheId, pageId, rootId, (BPlusInnerIO)io, leftId, rowBytes, rightId); - - break; - - case BTREE_PAGE_RECYCLE: - cacheId = in.readInt(); - pageId = in.readLong(); - - long newPageId = in.readLong(); - - res = new RecycleRecord(cacheId, pageId, newPageId); - - break; - - case BTREE_PAGE_INSERT: - cacheId = in.readInt(); - pageId = in.readLong(); - - ioType = in.readUnsignedShort(); - ioVer = in.readUnsignedShort(); - int itemIdx = in.readUnsignedShort(); - rightId = in.readLong(); - - io = BPlusIO.getBPlusIO(ioType, ioVer); - - rowBytes = new byte[io.getItemSize()]; - - in.readFully(rowBytes); - - res = new InsertRecord<>(cacheId, pageId, io, itemIdx, rowBytes, rightId); - - break; - - case BTREE_FIX_LEFTMOST_CHILD: - cacheId = in.readInt(); - pageId = in.readLong(); - - rightId = in.readLong(); - - res = new FixLeftmostChildRecord(cacheId, pageId, rightId); - - break; - - case BTREE_FIX_COUNT: - cacheId = in.readInt(); - pageId = in.readLong(); - - int cnt = in.readUnsignedShort(); - - res = new FixCountRecord(cacheId, pageId, cnt); - - break; - - case BTREE_PAGE_REPLACE: - cacheId = in.readInt(); - pageId = in.readLong(); - - ioType = in.readUnsignedShort(); - ioVer = in.readUnsignedShort(); - itemIdx = in.readUnsignedShort(); - - io = BPlusIO.getBPlusIO(ioType, ioVer); - - rowBytes = new byte[io.getItemSize()]; - - in.readFully(rowBytes); - - res = new ReplaceRecord<>(cacheId, pageId, io, rowBytes, itemIdx); - - break; - - case BTREE_PAGE_REMOVE: - cacheId = in.readInt(); - pageId = in.readLong(); - - itemIdx = in.readUnsignedShort(); - cnt = in.readUnsignedShort(); - - res = new RemoveRecord(cacheId, pageId, itemIdx, cnt); - - break; - - case BTREE_PAGE_INNER_REPLACE: - cacheId = in.readInt(); - pageId = in.readLong(); - - int dstIdx = in.readUnsignedShort(); - long srcPageId = in.readLong(); - int srcIdx = in.readUnsignedShort(); - rmvId = in.readLong(); - - res = new InnerReplaceRecord<>(cacheId, pageId, dstIdx, srcPageId, srcIdx, rmvId); - - break; - - case BTREE_FORWARD_PAGE_SPLIT: - cacheId = in.readInt(); - pageId = in.readLong(); - - long fwdId = in.readLong(); - ioType = in.readUnsignedShort(); - ioVer = in.readUnsignedShort(); - srcPageId = in.readLong(); - int mid = in.readUnsignedShort(); - cnt = in.readUnsignedShort(); - - res = new SplitForwardPageRecord(cacheId, pageId, fwdId, ioType, ioVer, srcPageId, mid, cnt); - - break; - - case BTREE_EXISTING_PAGE_SPLIT: - cacheId = in.readInt(); - pageId = in.readLong(); - - mid = in.readUnsignedShort(); - fwdId = in.readLong(); - - res = new SplitExistingPageRecord(cacheId, pageId, mid, fwdId); - - break; - - case BTREE_PAGE_MERGE: - cacheId = in.readInt(); - pageId = in.readLong(); - - long prntId = in.readLong(); - int prntIdx = in.readUnsignedShort(); - rightId = in.readLong(); - boolean emptyBranch = in.readBoolean(); - - res = new MergeRecord<>(cacheId, pageId, prntId, prntIdx, rightId, emptyBranch); - - break; - - case BTREE_FIX_REMOVE_ID: - cacheId = in.readInt(); - pageId = in.readLong(); - - rmvId = in.readLong(); - - res = new FixRemoveId(cacheId, pageId, rmvId); - - break; - - case PAGES_LIST_SET_NEXT: - cacheId = in.readInt(); - pageId = in.readLong(); - long nextPageId = in.readLong(); - - res = new PagesListSetNextRecord(cacheId, pageId, nextPageId); - - break; - - case PAGES_LIST_SET_PREVIOUS: - cacheId = in.readInt(); - pageId = in.readLong(); - long prevPageId = in.readLong(); - - res = new PagesListSetPreviousRecord(cacheId, pageId, prevPageId); - - break; - - case PAGES_LIST_INIT_NEW_PAGE: - cacheId = in.readInt(); - pageId = in.readLong(); - ioType = in.readInt(); - ioVer = in.readInt(); - newPageId = in.readLong(); - prevPageId = in.readLong(); - long addDataPageId = in.readLong(); - - res = new PagesListInitNewPageRecord(cacheId, pageId, ioType, ioVer, newPageId, prevPageId, addDataPageId); - - break; - - case PAGES_LIST_ADD_PAGE: - cacheId = in.readInt(); - pageId = in.readLong(); - long dataPageId = in.readLong(); - - res = new PagesListAddPageRecord(cacheId, pageId, dataPageId); - - break; - - case PAGES_LIST_REMOVE_PAGE: - cacheId = in.readInt(); - pageId = in.readLong(); - long rmvdPageId = in.readLong(); - - res = new PagesListRemovePageRecord(cacheId, pageId, rmvdPageId); - - break; - - case TRACKING_PAGE_DELTA: - cacheId = in.readInt(); - pageId = in.readLong(); - - long pageIdToMark = in.readLong(); - long nextSnapshotId0 = in.readLong(); - long lastSuccessfulSnapshotId0 = in.readLong(); - - res = new TrackingPageDeltaRecord(cacheId, pageId, pageIdToMark, nextSnapshotId0, lastSuccessfulSnapshotId0); - - break; - - case META_PAGE_UPDATE_NEXT_SNAPSHOT_ID: - cacheId = in.readInt(); - pageId = in.readLong(); - - long nextSnapshotId = in.readLong(); - - res = new MetaPageUpdateNextSnapshotId(cacheId, pageId, nextSnapshotId); - - break; - - case META_PAGE_UPDATE_LAST_SUCCESSFUL_FULL_SNAPSHOT_ID: - cacheId = in.readInt(); - pageId = in.readLong(); - - long lastSuccessfulFullSnapshotId = in.readLong(); - - res = new MetaPageUpdateLastSuccessfulFullSnapshotId(cacheId, pageId, lastSuccessfulFullSnapshotId); - - break; - - case META_PAGE_UPDATE_LAST_SUCCESSFUL_SNAPSHOT_ID: - cacheId = in.readInt(); - pageId = in.readLong(); - - long lastSuccessfulSnapshotId = in.readLong(); - long lastSuccessfulSnapshotTag = in.readLong(); - - res = new MetaPageUpdateLastSuccessfulSnapshotId(cacheId, pageId, lastSuccessfulSnapshotId, lastSuccessfulSnapshotTag); + /** Skip CRC calculation/check flag */ + public static boolean SKIP_CRC = IgniteSystemProperties.getBoolean(IGNITE_PDS_SKIP_CRC, false); - break; + /** V1 data serializer. */ + private final RecordDataV1Serializer dataSerializer; - case META_PAGE_UPDATE_LAST_ALLOCATED_INDEX: - cacheId = in.readInt(); - pageId = in.readLong(); + /** Write pointer. */ + private final boolean writePointer; - int lastAllocatedIdx = in.readInt(); + /** Record read/write functional interface. */ + private final RecordIO recordIO = new RecordIO() { - res = new MetaPageUpdateLastAllocatedIndex(cacheId, pageId, lastAllocatedIdx); + /** {@inheritDoc} */ + @Override public int sizeWithHeaders(WALRecord record) throws IgniteCheckedException { + return dataSerializer.size(record) + REC_TYPE_SIZE + FILE_WAL_POINTER_SIZE + CRC_SIZE; + } - break; + /** {@inheritDoc} */ + @Override public WALRecord readWithHeaders(ByteBufferBackedDataInput in, WALPointer expPtr) throws IOException, IgniteCheckedException { + RecordType recType = readRecordType(in); - case PART_META_UPDATE_STATE: - cacheId = in.readInt(); - partId = in.readInt(); + if (recType == RecordType.SWITCH_SEGMENT_RECORD) + throw new SegmentEofException("Reached end of segment", null); - state = in.readByte(); + FileWALPointer ptr = readPosition(in); - long updateCounter = in.readLong(); + if (!F.eq(ptr, expPtr)) + throw new SegmentEofException("WAL segment rollover detected (will end iteration) [expPtr=" + expPtr + + ", readPtr=" + ptr + ']', null); - res = new PartitionMetaStateRecord(cacheId, partId, GridDhtPartitionState.fromOrdinal(state), updateCounter); + return dataSerializer.readRecord(recType, in); + } - break; + /** {@inheritDoc} */ + @Override public void writeWithHeaders(WALRecord record, ByteBuffer buf) throws IgniteCheckedException { + // Write record type. + putRecordType(buf, record); - case PAGE_LIST_META_RESET_COUNT_RECORD: - cacheId = in.readInt(); - pageId = in.readLong(); + // Write record file position. + putPositionOfRecord(buf, record); - res = new PageListMetaResetCountRecord(cacheId, pageId); - break; + // Write record data. + dataSerializer.writeRecord(record, buf); + } + }; - case SWITCH_SEGMENT_RECORD: - throw new EOFException("END OF SEGMENT"); + /** + * Create an instance of V1 serializer. + * + * @param dataSerializer V1 data serializer. + * @param writePointer Write pointer. + */ + public RecordV1Serializer(RecordDataV1Serializer dataSerializer, boolean writePointer) { + this.dataSerializer = dataSerializer; + this.writePointer = writePointer; + } - case TX_RECORD: - res = txRecordSerializer.readTxRecord(in); + /** {@inheritDoc} */ + @Override public int version() { + return 1; + } - break; + /** {@inheritDoc} */ + @Override public boolean writePointer() { + return writePointer; + } - default: - throw new UnsupportedOperationException("Type: " + recType); - } + /** {@inheritDoc} */ + @SuppressWarnings("CastConflictsWithInstanceof") + @Override public void writeRecord(WALRecord record, ByteBuffer buf) throws IgniteCheckedException { + writeWithCrc(record, buf, recordIO); + } - return res; + /** {@inheritDoc} */ + @Override public WALRecord readRecord(FileInput in0, WALPointer expPtr) throws IOException, IgniteCheckedException { + return readWithCrc(in0, expPtr, recordIO); } /** {@inheritDoc} */ @SuppressWarnings("CastConflictsWithInstanceof") @Override public int size(WALRecord record) throws IgniteCheckedException { - int commonFields = REC_TYPE_SIZE + FILE_WAL_POINTER_SIZE + CRC_SIZE; - - switch (record.type()) { - case PAGE_RECORD: - assert record instanceof PageSnapshot; - - PageSnapshot pageRec = (PageSnapshot)record; - - return commonFields + pageRec.pageData().length + 12; - - case CHECKPOINT_RECORD: - CheckpointRecord cpRec = (CheckpointRecord)record; - - assert cpRec.checkpointMark() == null || cpRec.checkpointMark() instanceof FileWALPointer : - "Invalid WAL record: " + cpRec; - - int cacheStatesSize = cacheStatesSize(cpRec.cacheGroupStates()); - - FileWALPointer walPtr = (FileWALPointer)cpRec.checkpointMark(); - - return commonFields + 18 + cacheStatesSize + (walPtr == null ? 0 : 16); - - case META_PAGE_INIT: - return commonFields + /*cache ID*/4 + /*page ID*/8 + /*ioType*/2 + /*ioVer*/2 + /*tree root*/8 + /*reuse root*/8; - - case PARTITION_META_PAGE_UPDATE_COUNTERS: - return commonFields + /*cache ID*/4 + /*page ID*/8 + /*upd cntr*/8 + /*rmv id*/8 + /*part size*/4 + /*counters page id*/8 + /*state*/ 1 - + /*allocatedIdxCandidate*/ 4; - - case MEMORY_RECOVERY: - return commonFields + 8; - - case PARTITION_DESTROY: - return commonFields + /*cacheId*/4 + /*partId*/4; - - case DATA_RECORD: - DataRecord dataRec = (DataRecord)record; - - return commonFields + 4 + dataSize(dataRec); - - case HEADER_RECORD: - return HEADER_RECORD_SIZE; - - case DATA_PAGE_INSERT_RECORD: - DataPageInsertRecord diRec = (DataPageInsertRecord)record; - - return commonFields + 4 + 8 + 2 + diRec.payload().length; - - case DATA_PAGE_UPDATE_RECORD: - DataPageUpdateRecord uRec = (DataPageUpdateRecord)record; - - return commonFields + 4 + 8 + 2 + 4 + - uRec.payload().length; - - case DATA_PAGE_INSERT_FRAGMENT_RECORD: - final DataPageInsertFragmentRecord difRec = (DataPageInsertFragmentRecord)record; - - return commonFields + 4 + 8 + 8 + 4 + difRec.payloadSize(); - - case DATA_PAGE_REMOVE_RECORD: - return commonFields + 4 + 8 + 1; - - case DATA_PAGE_SET_FREE_LIST_PAGE: - return commonFields + 4 + 8 + 8; - - case INIT_NEW_PAGE_RECORD: - return commonFields + 4 + 8 + 2 + 2 + 8; - - case BTREE_META_PAGE_INIT_ROOT: - return commonFields + 4 + 8 + 8; - - case BTREE_META_PAGE_INIT_ROOT2: - return commonFields + 4 + 8 + 8 + 2; - - case BTREE_META_PAGE_ADD_ROOT: - return commonFields + 4 + 8 + 8; - - case BTREE_META_PAGE_CUT_ROOT: - return commonFields + 4 + 8; - - case BTREE_INIT_NEW_ROOT: - NewRootInitRecord riRec = (NewRootInitRecord)record; - - return commonFields + 4 + 8 + 8 + 2 + 2 + 8 + 8 + riRec.io().getItemSize(); - - case BTREE_PAGE_RECYCLE: - return commonFields + 4 + 8 + 8; - - case BTREE_PAGE_INSERT: - InsertRecord inRec = (InsertRecord)record; - - return commonFields + 4 + 8 + 2 + 2 + 2 + 8 + inRec.io().getItemSize(); - - case BTREE_FIX_LEFTMOST_CHILD: - return commonFields + 4 + 8 + 8; - - case BTREE_FIX_COUNT: - return commonFields + 4 + 8 + 2; - - case BTREE_PAGE_REPLACE: - ReplaceRecord rRec = (ReplaceRecord)record; - - return commonFields + 4 + 8 + 2 + 2 + 2 + rRec.io().getItemSize(); - - case BTREE_PAGE_REMOVE: - return commonFields + 4 + 8 + 2 + 2; - - case BTREE_PAGE_INNER_REPLACE: - return commonFields + 4 + 8 + 2 + 8 + 2 + 8; - - case BTREE_FORWARD_PAGE_SPLIT: - return commonFields + 4 + 8 + 8 + 2 + 2 + 8 + 2 + 2; - - case BTREE_EXISTING_PAGE_SPLIT: - return commonFields + 4 + 8 + 2 + 8; - - case BTREE_PAGE_MERGE: - return commonFields + 4 + 8 + 8 + 2 + 8 + 1; - - case BTREE_FIX_REMOVE_ID: - return commonFields + 4 + 8 + 8; - - case PAGES_LIST_SET_NEXT: - return commonFields + 4 + 8 + 8; - - case PAGES_LIST_SET_PREVIOUS: - return commonFields + 4 + 8 + 8; - - case PAGES_LIST_INIT_NEW_PAGE: - return commonFields + 4 + 8 + 4 + 4 + 8 + 8 + 8; - - case PAGES_LIST_ADD_PAGE: - return commonFields + 4 + 8 + 8; - - case PAGES_LIST_REMOVE_PAGE: - return commonFields + 4 + 8 + 8; - - case TRACKING_PAGE_DELTA: - return commonFields + 4 + 8 + 8 + 8 + 8; - - case META_PAGE_UPDATE_LAST_SUCCESSFUL_SNAPSHOT_ID: - return commonFields + 4 + 8 + 8 + 8; - - case META_PAGE_UPDATE_LAST_SUCCESSFUL_FULL_SNAPSHOT_ID: - return commonFields + 4 + 8 + 8; - - case META_PAGE_UPDATE_NEXT_SNAPSHOT_ID: - return commonFields + 4 + 8 + 8; - - case META_PAGE_UPDATE_LAST_ALLOCATED_INDEX: - return commonFields + 4 + 8 + 4; - - case PART_META_UPDATE_STATE: - return commonFields + /*cacheId*/ 4 + /*partId*/ 4 + /*State*/1 + /*Update Counter*/ 8; - - case PAGE_LIST_META_RESET_COUNT_RECORD: - return commonFields + /*cacheId*/ 4 + /*pageId*/ 8; - - case SWITCH_SEGMENT_RECORD: - return commonFields - CRC_SIZE; //CRC is not loaded for switch segment, exception is thrown instead - - case TX_RECORD: - return commonFields + txRecordSerializer.sizeOfTxRecord((TxRecord) record); - - default: - throw new UnsupportedOperationException("Type: " + record.type()); - } + return recordIO.sizeWithHeaders(record); } /** @@ -1463,213 +170,106 @@ public class RecordV1Serializer implements RecordSerializer { } /** - * @param dataRec Data record to serialize. - * @return Full data record size. - * @throws IgniteCheckedException If failed to obtain the length of one of the entries. - */ - private int dataSize(DataRecord dataRec) throws IgniteCheckedException { - int sz = 0; - - for (DataEntry entry : dataRec.writeEntries()) - sz += entrySize(entry); - - return sz; - } - - /** - * @param entry Entry to get size for. - * @return Entry size. - * @throws IgniteCheckedException If failed to get key or value bytes length. + * Writes record file position to given {@code buf}. + * + * @param buf Buffer to write record file position. + * @param record WAL record. */ - private int entrySize(DataEntry entry) throws IgniteCheckedException { - GridCacheContext cctx = this.cctx.cacheContext(entry.cacheId()); - CacheObjectContext coCtx = cctx.cacheObjectContext(); - - return - /*cache ID*/4 + - /*key*/entry.key().valueBytesLength(coCtx) + - /*value*/(entry.value() == null ? 4 : entry.value().valueBytesLength(coCtx)) + - /*op*/1 + - /*near xid ver*/CacheVersionIO.size(entry.nearXidVersion(), true) + - /*write ver*/CacheVersionIO.size(entry.writeVersion(), false) + - /*part ID*/4 + - /*expire Time*/8 + - /*part cnt*/8; + public static void putPositionOfRecord(ByteBuffer buf, WALRecord record) { + putPosition(buf, (FileWALPointer) record.position()); } /** - * @param states Partition states. - * @return Size required to write partition states. + * Writes record type to given {@code buf}. + * + * @param buf Buffer to write record type. + * @param record WAL record. */ - private int cacheStatesSize(Map states) { - // Need 4 bytes for the number of caches. - int size = 2; - - for (Map.Entry entry : states.entrySet()) { - // Cache ID. - size += 4; - - // Need 2 bytes for the number of partitions. - size += 2; - - CacheState state = entry.getValue(); - - // 2 bytes partition ID, size and counter per partition. - size += 18 * state.size(); - } - - return size; + public static void putRecordType(ByteBuffer buf, WALRecord record) { + buf.put((byte)(record.type().ordinal() + 1)); } /** - * @param buf Buffer to write to. - * @param entry Data entry. + * Reads record type from given {@code in}. + * + * @param in Buffer to read record type. + * @return Record type. + * @throws IgniteCheckedException If logical end of segment is reached. + * @throws IOException In case of I/O problems. */ - private void putDataEntry(ByteBuffer buf, DataEntry entry) throws IgniteCheckedException { - buf.putInt(entry.cacheId()); - - if (!entry.key().putValue(buf)) - throw new AssertionError(); + public static RecordType readRecordType(DataInput in) throws IgniteCheckedException, IOException { + int type = in.readUnsignedByte(); - if (entry.value() == null) - buf.putInt(-1); - else if (!entry.value().putValue(buf)) - throw new AssertionError(); + if (type == WALRecord.RecordType.STOP_ITERATION_RECORD_TYPE) + throw new SegmentEofException("Reached logical end of the segment", null); - buf.put((byte)entry.op().ordinal()); + RecordType recType = RecordType.fromOrdinal(type - 1); - putVersion(buf, entry.nearXidVersion(), true); - putVersion(buf, entry.writeVersion(), false); + if (recType == null) + throw new IOException("Unknown record type: " + type); - buf.putInt(entry.partitionId()); - buf.putLong(entry.partitionCounter()); - buf.putLong(entry.expireTime()); + return recType; } /** - * @param states Cache states. + * Reads record from file {@code in0} and validates CRC of record. + * + * @param in0 File input. + * @param expPtr Expected WAL pointer for record. Used to validate actual position against expected from the file. + * @param reader Record reader I/O interface. + * @return WAL record. + * @throws EOFException In case of end of file. + * @throws IgniteCheckedException If it's unable to read record. */ - private void putCacheStates(ByteBuffer buf, Map states) { - buf.putShort((short)states.size()); + public static WALRecord readWithCrc(FileInput in0, WALPointer expPtr, RecordIO reader) throws EOFException, IgniteCheckedException { + long startPos = -1; - for (Map.Entry entry : states.entrySet()) { - buf.putInt(entry.getKey()); + try (FileInput.Crc32CheckingFileInput in = in0.startRead(SKIP_CRC)) { + startPos = in0.position(); - CacheState state = entry.getValue(); + WALRecord res = reader.readWithHeaders(in, expPtr); - // Need 2 bytes for the number of partitions. - buf.putShort((short)state.size()); + assert res != null; - for (int i = 0; i < state.size(); i++) { - buf.putShort((short)state.partitionByIndex(i)); + res.size((int)(in0.position() - startPos + CRC_SIZE)); // Account for CRC which will be read afterwards. - buf.putLong(state.partitionSizeByIndex(i)); - buf.putLong(state.partitionCounterByIndex(i)); - } + return res; } - } - - /** - * @param in Input to read from. - * @return Read entry. - */ - private DataEntry readDataEntry(ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException { - int cacheId = in.readInt(); - - int keySize = in.readInt(); - byte keyType = in.readByte(); - byte[] keyBytes = new byte[keySize]; - in.readFully(keyBytes); - - int valSize = in.readInt(); - - byte valType = 0; - byte[] valBytes = null; - - if (valSize >= 0) { - valType = in.readByte(); - valBytes = new byte[valSize]; - in.readFully(valBytes); + catch (EOFException | SegmentEofException | WalSegmentTailReachedException e) { + throw e; } - - byte ord = in.readByte(); - - GridCacheOperation op = GridCacheOperation.fromOrdinal(ord & 0xFF); - - GridCacheVersion nearXidVer = readVersion(in, true); - GridCacheVersion writeVer = readVersion(in, false); - - int partId = in.readInt(); - long partCntr = in.readLong(); - long expireTime = in.readLong(); - - GridCacheContext cacheCtx = cctx.cacheContext(cacheId); - - if (cacheCtx != null) { - CacheObjectContext coCtx = cacheCtx.cacheObjectContext(); - - KeyCacheObject key = co.toKeyCacheObject(coCtx, keyType, keyBytes); - CacheObject val = valBytes != null ? co.toCacheObject(coCtx, valType, valBytes) : null; - - return new DataEntry( - cacheId, - key, - val, - op, - nearXidVer, - writeVer, - expireTime, - partId, - partCntr - ); + catch (Exception e) { + throw new IgniteCheckedException("Failed to read WAL record at position: " + startPos, e); } - else - return new LazyDataEntry( - cctx, - cacheId, - keyType, - keyBytes, - valType, - valBytes, - op, - nearXidVer, - writeVer, - expireTime, - partId, - partCntr); } /** - * @param buf Buffer to read from. - * @return Read map. + * Writes record with calculated CRC to buffer {@code buf}. + * + * @param record WAL record. + * @param buf Buffer to write. + * @param writer Record write I/O interface. + * @throws IgniteCheckedException If it's unable to write record. */ - private Map readPartitionStates(DataInput buf) throws IOException { - int caches = buf.readShort() & 0xFFFF; - - if (caches == 0) - return Collections.emptyMap(); + public static void writeWithCrc(WALRecord record, ByteBuffer buf, RecordIO writer) throws IgniteCheckedException { + assert record.size() >= 0 && buf.remaining() >= record.size() : record.size(); - Map states = new HashMap<>(caches, 1.0f); - - for (int i = 0; i < caches; i++) { - int cacheId = buf.readInt(); + int startPos = buf.position(); - int parts = buf.readShort() & 0xFFFF; + writer.writeWithHeaders(record, buf); - CacheState state = new CacheState(parts); + if (!SKIP_CRC) { + int curPos = buf.position(); - for (int p = 0; p < parts; p++) { - int partId = buf.readShort() & 0xFFFF; - long size = buf.readLong(); - long partCntr = buf.readLong(); + buf.position(startPos); - state.addPartitionState(partId, size, partCntr); - } + // This call will move buffer position to the end of the record again. + int crcVal = PureJavaCrc32.calcCrc32(buf, curPos - startPos); - states.put(cacheId, state); + buf.putInt(crcVal); } - - return states; + else + buf.putInt(0); } /** @@ -1703,15 +303,4 @@ public class RecordV1Serializer implements RecordSerializer { throw new IOException(e); } } - - /** - * @param buf Buffer. - * @param rowBytes Row bytes. - */ - @SuppressWarnings("unchecked") - private static void putRow(ByteBuffer buf, byte[] rowBytes) { - assert rowBytes.length > 0; - - buf.put(rowBytes); - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/721df25f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java new file mode 100644 index 0000000..0a5bf01 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal.serializer; + +import java.io.DataInput; +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.pagemem.wal.WALPointer; +import org.apache.ignite.internal.pagemem.wal.record.WALRecord; +import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer; +import org.apache.ignite.internal.processors.cache.persistence.wal.RecordSerializer; +import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentEofException; +import org.apache.ignite.internal.processors.cache.persistence.wal.WalSegmentTailReachedException; +import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.io.RecordIO; +import org.apache.ignite.internal.util.typedef.F; + +import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.CRC_SIZE; +import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.REC_TYPE_SIZE; + +/** + * Record V2 serializer. + * Stores records in following format: + *
    + *
  • Record type from {@link WALRecord.RecordType#ordinal()} incremented by 1
  • + *
  • WAL pointer to double check consistency
  • + *
  • Record length
  • + *
  • Data
  • + *
  • CRC or zero padding
  • + *
+ */ +public class RecordV2Serializer implements RecordSerializer { + /** Length of WAL Pointer: Index (8) + File offset (4) + Record length (4) */ + public static final int FILE_WAL_POINTER_SIZE = 8 + 4 + 4; + + /** V2 data serializer. */ + private final RecordDataV2Serializer dataSerializer; + + /** Write pointer. */ + private final boolean writePointer; + + /** Record read/write functional interface. */ + private final RecordIO recordIO = new RecordIO() { + + /** {@inheritDoc} */ + @Override public int sizeWithHeaders(WALRecord record) throws IgniteCheckedException { + return dataSerializer.size(record) + REC_TYPE_SIZE + FILE_WAL_POINTER_SIZE + CRC_SIZE; + } + + /** {@inheritDoc} */ + @Override public WALRecord readWithHeaders( + ByteBufferBackedDataInput in, + WALPointer expPtr + ) throws IOException, IgniteCheckedException { + WALRecord.RecordType recType = RecordV1Serializer.readRecordType(in); + + if (recType == WALRecord.RecordType.SWITCH_SEGMENT_RECORD) + throw new SegmentEofException("Reached end of segment", null); + + FileWALPointer ptr = readPositionAndCheckPoint(in, expPtr); + + return dataSerializer.readRecord(recType, in); + } + + /** {@inheritDoc} */ + @Override public void writeWithHeaders( + WALRecord record, + ByteBuffer buf + ) throws IgniteCheckedException { + // Write record type. + RecordV1Serializer.putRecordType(buf, record); + + // Write record file position. + putPositionOfRecord(buf, record); + + // Write record data. + dataSerializer.writeRecord(record, buf); + } + }; + + /** + * Create an instance of Record V2 serializer. + * + * @param dataSerializer V2 data serializer. + */ + public RecordV2Serializer(RecordDataV2Serializer dataSerializer, boolean writePointer) { + this.dataSerializer = dataSerializer; + this.writePointer = writePointer; + } + + /** {@inheritDoc} */ + @Override public int version() { + return 2; + } + + /** {@inheritDoc} */ + @Override public boolean writePointer() { + return writePointer; + } + + /** {@inheritDoc} */ + @Override public int size(WALRecord record) throws IgniteCheckedException { + return recordIO.sizeWithHeaders(record); + } + + /** {@inheritDoc} */ + @Override public void writeRecord(WALRecord record, ByteBuffer buf) throws IgniteCheckedException { + RecordV1Serializer.writeWithCrc(record, buf, recordIO); + } + + /** {@inheritDoc} */ + @Override public WALRecord readRecord(FileInput in, WALPointer expPtr) throws IOException, IgniteCheckedException { + return RecordV1Serializer.readWithCrc(in, expPtr, recordIO); + } + + /** + * @param in Data input to read pointer from. + * @return Read file WAL pointer. + * @throws IOException If failed to write. + */ + public static FileWALPointer readPositionAndCheckPoint( + DataInput in, + WALPointer expPtr + ) throws IgniteCheckedException, IOException { + long idx = in.readLong(); + int fileOffset = in.readInt(); + int length = in.readInt(); + + FileWALPointer p = (FileWALPointer)expPtr; + + if (!F.eq(idx, p.index()) || !F.eq(fileOffset, p.fileOffset())) + throw new WalSegmentTailReachedException( + "WAL segment tail is reached. [ " + + "Expected next state: {Index=" + p.index() + ",Offset=" + p.fileOffset() + "}, " + + "Actual state : {Index=" + idx + ",Offset=" + fileOffset + "} ]", null); + + return new FileWALPointer(idx, fileOffset, length); + } + + /** + * Writes record file position to given {@code buf}. + * + * @param buf Buffer to write record file position. + * @param record WAL record. + */ + public static void putPositionOfRecord(ByteBuffer buf, WALRecord record) { + FileWALPointer p = (FileWALPointer)record.position(); + + buf.putLong(p.index()); + buf.putInt(p.fileOffset()); + buf.putInt(record.size()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/721df25f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/io/RecordIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/io/RecordIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/io/RecordIO.java new file mode 100644 index 0000000..d609e61 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/io/RecordIO.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal.serializer.io; + +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.pagemem.wal.WALPointer; +import org.apache.ignite.internal.pagemem.wal.record.WALRecord; +import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput; + +/** + * Internal interface to provide size, read and write operations of WAL records + * including record header and data. + */ +public interface RecordIO { + /** + * Calculates and returns size of record data and headers. + * + * @param record WAL record. + * @return Size in bytes. + * @throws IgniteCheckedException If it's unable to calculate size of record. + */ + int sizeWithHeaders(WALRecord record) throws IgniteCheckedException; + + /** + * Reads record data with headers from {@code in}. + * + * @param in Buffer to read. + * @param expPtr Expected WAL pointer for record. Used to validate actual position against expected from the file. + * @return WAL record. + * @throws IOException In case of I/O problems. + * @throws IgniteCheckedException If it's unable to read record. + */ + WALRecord readWithHeaders(ByteBufferBackedDataInput in, WALPointer expPtr) throws IOException, IgniteCheckedException; + + /** + * Writes record data with headers to {@code buf}. + * + * @param record WAL record. + * @param buf Buffer to write. + * @throws IgniteCheckedException If it's unable to write record. + */ + void writeWithHeaders(WALRecord record, ByteBuffer buf) throws IgniteCheckedException; +}