Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4D822200CFC for ; Thu, 28 Sep 2017 12:58:17 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 4BF451609C2; Thu, 28 Sep 2017 10:58:17 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 036DE160BD7 for ; Thu, 28 Sep 2017 12:58:14 +0200 (CEST) Received: (qmail 81397 invoked by uid 500); 28 Sep 2017 10:58:14 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 81216 invoked by uid 99); 28 Sep 2017 10:58:14 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Sep 2017 10:58:14 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1ACC8F5BD4; Thu, 28 Sep 2017 10:58:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Thu, 28 Sep 2017 10:58:27 -0000 Message-Id: <83c875bbc5a742298955379ea077c84c@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [17/19] ignite git commit: IGNITE-6029 Record serializer refactoring and initial stuff for Record V2 serialization. archived-at: Thu, 28 Sep 2017 10:58:17 -0000 IGNITE-6029 Record serializer refactoring and initial stuff for Record V2 serialization. Signed-off-by: Andrey Gura Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/00770767 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/00770767 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/00770767 Branch: refs/heads/ignite-3478 Commit: 007707674e32b4123708e267feec04075a1b4663 Parents: b7bb792 Author: Pavel Kovalenko Authored: Thu Sep 28 12:15:19 2017 +0300 Committer: Andrey Gura Committed: Thu Sep 28 12:16:15 2017 +0300 ---------------------------------------------------------------------- .../pagemem/wal/record/SwitchSegmentRecord.java | 28 + .../wal/AbstractWalRecordsIterator.java | 54 +- .../wal/FileWriteAheadLogManager.java | 193 +- .../persistence/wal/RecordDataSerializer.java | 41 + .../wal/WalSegmentTailReachedException.java | 37 + .../wal/reader/IgniteWalIteratorFactory.java | 5 +- .../reader/StandaloneWalRecordsIterator.java | 27 +- .../wal/serializer/RecordDataV1Serializer.java | 1574 ++++++++++++++++ .../wal/serializer/RecordDataV2Serializer.java | 64 + .../wal/serializer/RecordV1Serializer.java | 1673 ++---------------- .../wal/serializer/RecordV2Serializer.java | 170 ++ .../persistence/wal/serializer/io/RecordIO.java | 60 + 12 files changed, 2305 insertions(+), 1621 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/00770767/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/SwitchSegmentRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/SwitchSegmentRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/SwitchSegmentRecord.java new file mode 100644 index 0000000..948ec7e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/SwitchSegmentRecord.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagemem.wal.record; + +/** + * Record is needed to mark end of segment. + */ +public class SwitchSegmentRecord extends WALRecord { + /** {@inheritDoc} */ + @Override public RecordType type() { + return RecordType.SWITCH_SEGMENT_RECORD; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00770767/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java index d5a2555..5be6e55 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java @@ -30,7 +30,6 @@ import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; -import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord; import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; import org.apache.ignite.lang.IgniteBiTuple; import org.jetbrains.annotations.NotNull; @@ -154,21 +153,31 @@ public abstract class AbstractWalRecordsIterator */ protected void advance() throws IgniteCheckedException { while (true) { - curRec = advanceRecord(currWalSegment); - - if (curRec != null) - return; - else { - currWalSegment = advanceSegment(currWalSegment); + try { + curRec = advanceRecord(currWalSegment); - if (currWalSegment == null) + if (curRec != null) return; + else { + currWalSegment = advanceSegment(currWalSegment); + + if (currWalSegment == null) + return; + } + } + catch (WalSegmentTailReachedException e) { + log.warning(e.getMessage()); + + curRec = null; + + return; } } } /** * Closes and returns WAL segment (if any) + * * @return closed handle * @throws IgniteCheckedException if IO failed */ @@ -199,7 +208,8 @@ public abstract class AbstractWalRecordsIterator * @return next advanced record */ private IgniteBiTuple advanceRecord( - @Nullable final FileWriteAheadLogManager.ReadFileHandle hnd) { + @Nullable final FileWriteAheadLogManager.ReadFileHandle hnd + ) throws IgniteCheckedException { if (hnd == null) return null; @@ -217,8 +227,12 @@ public abstract class AbstractWalRecordsIterator return new IgniteBiTuple<>((WALPointer)ptr, postProcessRecord(rec)); } catch (IOException | IgniteCheckedException e) { + if (e instanceof WalSegmentTailReachedException) + throw (WalSegmentTailReachedException)e; + if (!(e instanceof SegmentEofException)) handleRecordException(e, ptr); + return null; } } @@ -261,24 +275,18 @@ public abstract class AbstractWalRecordsIterator FileIO fileIO = ioFactory.create(desc.file); try { - FileInput in = new FileInput(fileIO, buf); + int serVer = FileWriteAheadLogManager.readSerializerVersion(fileIO); - // Header record must be agnostic to the serializer version. - WALRecord rec = serializer.readRecord(in, - new FileWALPointer(desc.idx, (int)fileIO.position(), 0)); + RecordSerializer ser = FileWriteAheadLogManager.forVersion(sharedCtx, serVer); - if (rec == null) - return null; - - if (rec.type() != WALRecord.RecordType.HEADER_RECORD) - throw new IOException("Missing file header record: " + desc.file.getAbsoluteFile()); - - int ver = ((HeaderRecord)rec).version(); + FileInput in = new FileInput(fileIO, buf); - RecordSerializer ser = FileWriteAheadLogManager.forVersion(sharedCtx, ver, serializer.writePointer()); + if (start != null && desc.idx == start.index()) { + // Make sure we skip header with serializer version. + long startOffset = Math.max(start.fileOffset(), fileIO.position()); - if (start != null && desc.idx == start.index()) - in.seek(start.fileOffset()); + in.seek(startOffset); + } return new FileWriteAheadLogManager.ReadFileHandle(fileIO, desc.idx, sharedCtx.igniteInstanceName(), ser, in); } http://git-wip-us.apache.org/repos/asf/ignite/blob/00770767/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index 9b2d948..c4582cf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -55,6 +55,7 @@ import org.apache.ignite.internal.pagemem.wal.StorageException; import org.apache.ignite.internal.pagemem.wal.WALIterator; import org.apache.ignite.internal.pagemem.wal.WALPointer; import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord; +import org.apache.ignite.internal.pagemem.wal.record.SwitchSegmentRecord; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; @@ -62,8 +63,12 @@ import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabase import org.apache.ignite.internal.processors.cache.persistence.PersistenceMetricsImpl; import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32; import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord; +import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordDataV1Serializer; +import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordDataV2Serializer; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer; +import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV2Serializer; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.GridUnsafe; @@ -116,6 +121,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } }; + /** Latest serializer version to use. */ + public static final int LATEST_SERIALIZER_VERSION = 1; + /** */ private final boolean alwaysWriteFullPages; @@ -152,9 +160,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** WAL archive directory (including consistent ID as subfolder) */ private File walArchiveDir; - /** Serializer of current version, used to read header record and for write records */ + /** Serializer of latest version. */ private RecordSerializer serializer; + /** Serializer latest version to use. */ + private int serializerVersion = LATEST_SERIALIZER_VERSION; + /** */ private volatile long oldestArchiveSegmentIdx; @@ -266,7 +277,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl "write ahead log archive directory" ); - serializer = new RecordV1Serializer(cctx); + serializer = forVersion(cctx, serializerVersion); GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)cctx.database(); @@ -818,10 +829,17 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl FileIO fileIO = ioFactory.create(curFile); try { - // readSerializerVersion will change the channel position. - // This is fine because the FileWriteHandle consitructor will move it - // to offset + len anyways. - int serVer = readSerializerVersion(fileIO, curFile, absIdx); + int serVer = serializerVersion; + + // If we have existing segment, try to read version from it. + if (lastReadPtr != null) { + try { + serVer = readSerializerVersion(fileIO); + } + catch (SegmentEofException | EOFException ignore) { + serVer = serializerVersion; + } + } RecordSerializer ser = forVersion(cctx, serVer); @@ -837,13 +855,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl maxWalSegmentSize, ser); - if (lastReadPtr == null) { - HeaderRecord header = new HeaderRecord(serializer.version()); - - header.size(serializer.size(header)); - - hnd.addRecord(header); - } + // For new handle write serializer version to it. + if (lastReadPtr == null) + hnd.writeSerializerVersion(); archiver.currentWalIndex(absIdx); @@ -887,11 +901,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl maxWalSegmentSize, serializer); - HeaderRecord header = new HeaderRecord(serializer.version()); - - header.size(serializer.size(header)); - - hnd.addRecord(header); + hnd.writeSerializerVersion(); return hnd; } @@ -1010,10 +1020,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** + * @param cctx Shared context. * @param ver Serializer version. * @return Entry serializer. */ - static RecordSerializer forVersion(GridCacheSharedContext cctx, int ver) throws IgniteCheckedException { + public static RecordSerializer forVersion(GridCacheSharedContext cctx, int ver) throws IgniteCheckedException { return forVersion(cctx, ver, false); } @@ -1027,7 +1038,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl switch (ver) { case 1: - return new RecordV1Serializer(cctx, writePointer); + return new RecordV1Serializer(new RecordDataV1Serializer(cctx), writePointer); + + case 2: + RecordDataV2Serializer dataV2Serializer = new RecordDataV2Serializer(new RecordDataV1Serializer(cctx)); + + return new RecordV2Serializer(dataV2Serializer, writePointer); default: throw new IgniteCheckedException("Failed to create a serializer with the given version " + @@ -1434,29 +1450,103 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** + * Reads record serializer version from provided {@code io}. + * NOTE: Method mutates position of {@code io}. + * * @param io I/O interface for file. - * @param file File object. - * @param idx File index to read. * @return Serializer version stored in the file. - * @throws IOException If failed to read serializer version. * @throws IgniteCheckedException If failed to read serializer version. */ - private int readSerializerVersion(FileIO io, File file, long idx) - throws IOException, IgniteCheckedException { - try (ByteBufferExpander buf = new ByteBufferExpander(RecordV1Serializer.HEADER_RECORD_SIZE, ByteOrder.nativeOrder())){ + public static int readSerializerVersion(FileIO io) + throws IgniteCheckedException, IOException { + try (ByteBufferExpander buf = new ByteBufferExpander(RecordV1Serializer.HEADER_RECORD_SIZE, ByteOrder.nativeOrder())) { FileInput in = new FileInput(io, buf); - // Header record must be agnostic to the serializer version. - WALRecord rec = serializer.readRecord(in, new FileWALPointer(idx, 0, 0)); + in.ensure(RecordV1Serializer.HEADER_RECORD_SIZE); + + int recordType = in.readUnsignedByte(); + + if (recordType == WALRecord.RecordType.STOP_ITERATION_RECORD_TYPE) + throw new SegmentEofException("Reached logical end of the segment", null); + + WALRecord.RecordType type = WALRecord.RecordType.fromOrdinal(recordType - 1); + + if (type != WALRecord.RecordType.HEADER_RECORD) + throw new IOException("Can't read serializer version", null); + + // Read file pointer. + FileWALPointer ptr = RecordV1Serializer.readPosition(in); + + assert ptr.fileOffset() == 0 : "Header record should be placed at the beginning of file " + ptr; + + long headerMagicNumber = in.readLong(); + + if (headerMagicNumber != HeaderRecord.MAGIC) + throw new IOException("Magic is corrupted [exp=" + U.hexLong(HeaderRecord.MAGIC) + + ", actual=" + U.hexLong(headerMagicNumber) + ']'); + + // Read serializer version. + int version = in.readInt(); + + // Read and skip CRC. + in.readInt(); + + return version; + } + } + + /** + * Writes record serializer version to provided {@code io}. + * NOTE: Method mutates position of {@code io}. + * + * @param io I/O interface for file. + * @param idx Segment index. + * @param version Serializer version. + * @return I/O position after write version. + * @throws IOException If failed to write serializer version. + */ + public static long writeSerializerVersion(FileIO io, long idx, int version) throws IOException { + ByteBuffer buffer = ByteBuffer.allocate(RecordV1Serializer.HEADER_RECORD_SIZE); + buffer.order(ByteOrder.nativeOrder()); + + // Write record type. + buffer.put((byte) (WALRecord.RecordType.HEADER_RECORD.ordinal() + 1)); + + // Write position. + RecordV1Serializer.putPosition(buffer, new FileWALPointer(idx, 0, 0)); - if (rec.type() != WALRecord.RecordType.HEADER_RECORD) - throw new IOException("Missing file header record: " + file.getAbsoluteFile()); + // Place magic number. + buffer.putLong(HeaderRecord.MAGIC); - return ((HeaderRecord)rec).version(); + // Place serializer version. + buffer.putInt(version); + + // Place CRC if needed. + if (!RecordV1Serializer.SKIP_CRC) { + int curPos = buffer.position(); + + buffer.position(0); + + // This call will move buffer position to the end of the record again. + int crcVal = PureJavaCrc32.calcCrc32(buffer, curPos); + + buffer.putInt(crcVal); } - catch (SegmentEofException | EOFException ignore) { - return serializer.version(); + else + buffer.putInt(0); + + // Write header record through io. + buffer.position(0); + + do { + io.write(buffer); } + while (buffer.hasRemaining()); + + // Flush + io.force(); + + return io.position(); } /** @@ -1715,6 +1805,27 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** + * Write serializer version to current handle. + * NOTE: Method mutates {@code fileIO} position, written and lastFsyncPos fields. + * + * @throws IgniteCheckedException If fail to write serializer version. + */ + public void writeSerializerVersion() throws IgniteCheckedException { + try { + assert fileIO.position() == 0 : "Serializer version can be written only at the begin of file " + fileIO.position(); + + long updatedPosition = FileWriteAheadLogManager.writeSerializerVersion(fileIO, idx, serializer.version()); + + written = updatedPosition; + lastFsyncPos = updatedPosition; + head.set(new FakeRecord(new FileWALPointer(idx, (int)updatedPosition, 0), false)); + } + catch (IOException e) { + throw new IgniteCheckedException("Unable to write serializer version for segment " + idx, e); + } + } + + /** * Checks if current head is a close fake record and returns {@code true} if so. * * @return {@code true} if current head is close record. @@ -2080,15 +2191,17 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl assert stopped() : "Segment is not closed after close flush: " + head.get(); try { - int switchSegmentRecSize = RecordV1Serializer.REC_TYPE_SIZE + RecordV1Serializer.FILE_WAL_POINTER_SIZE; + int switchSegmentRecSize = RecordV1Serializer.REC_TYPE_SIZE + RecordV1Serializer.FILE_WAL_POINTER_SIZE + RecordV1Serializer.CRC_SIZE; + + if (rollOver && written < (maxSegmentSize - switchSegmentRecSize)) { + RecordV1Serializer backwardSerializer = + new RecordV1Serializer(new RecordDataV1Serializer(cctx), true); - if (rollOver && written < (maxSegmentSize - switchSegmentRecSize)) { - //it is expected there is sufficient space for this record because rollover should run early - final ByteBuffer buf = ByteBuffer.allocate(switchSegmentRecSize); - buf.put((byte)(WALRecord.RecordType.SWITCH_SEGMENT_RECORD.ordinal() + 1)); + final ByteBuffer buf = ByteBuffer.allocate(switchSegmentRecSize); - final FileWALPointer pointer = new FileWALPointer(idx, (int)fileIO.position(), -1); - RecordV1Serializer.putPosition(buf, pointer); + SwitchSegmentRecord segmentRecord = new SwitchSegmentRecord(); + segmentRecord.position( new FileWALPointer(idx, (int)written, -1)); + backwardSerializer.writeRecord(segmentRecord,buf); buf.rewind(); http://git-wip-us.apache.org/repos/asf/ignite/blob/00770767/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordDataSerializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordDataSerializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordDataSerializer.java new file mode 100644 index 0000000..242641d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordDataSerializer.java @@ -0,0 +1,41 @@ +package org.apache.ignite.internal.processors.cache.persistence.wal; + +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.pagemem.wal.record.WALRecord; + +/** + * Interface to provide size, read and write operations with WAL records + * without any headers and meta information. + */ +public interface RecordDataSerializer { + /** + * Calculates size of record data. + * + * @param record WAL record. + * @return Size of record in bytes. + * @throws IgniteCheckedException If it's unable to calculate record data size. + */ + int size(WALRecord record) throws IgniteCheckedException; + + /** + * Reads record data of {@code type} from buffer {@code in}. + * + * @param type Record type. + * @param in Buffer to read. + * @return WAL record. + * @throws IOException In case of I/O problems. + * @throws IgniteCheckedException If it's unable to read record. + */ + WALRecord readRecord(WALRecord.RecordType type, ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException; + + /** + * Writes record data to buffer {@code buf}. + * + * @param record WAL record. + * @param buf Buffer to write. + * @throws IgniteCheckedException If it's unable to write record. + */ + void writeRecord(WALRecord record, ByteBuffer buf) throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00770767/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalSegmentTailReachedException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalSegmentTailReachedException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalSegmentTailReachedException.java new file mode 100644 index 0000000..36298dc --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalSegmentTailReachedException.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal; + +import org.apache.ignite.IgniteCheckedException; +import org.jetbrains.annotations.Nullable; + +/** + * An exception is thrown when we reached tail of WAL segment cyclic buffer + * during reading from WAL. + */ +public class WalSegmentTailReachedException extends IgniteCheckedException { + /** */ + private static final long serialVersionUID = 0L; + + /** + * + */ + public WalSegmentTailReachedException(String msg, @Nullable Throwable cause) { + super(msg, cause); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00770767/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java index 3a34e28..0fb8adf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java @@ -133,7 +133,7 @@ public class IgniteWalIteratorFactory { * @param pageSize Page size which was used in Ignite Persistent Data store to read WAL from, size is validated * according its boundaries. */ - public IgniteWalIteratorFactory(@NotNull final IgniteLogger log, final int pageSize) { + public IgniteWalIteratorFactory(@NotNull final IgniteLogger log, int pageSize) { this(log, new PersistentStoreConfiguration().getFileIOFactory(), pageSize); } @@ -148,8 +148,7 @@ public class IgniteWalIteratorFactory { * @return closable WAL records iterator, should be closed when non needed * @throws IgniteCheckedException if failed to read folder */ - public WALIterator iteratorArchiveDirectory( - @NotNull final File walDirWithConsistentId) throws IgniteCheckedException { + public WALIterator iteratorArchiveDirectory(@NotNull final File walDirWithConsistentId) throws IgniteCheckedException { return new StandaloneWalRecordsIterator(walDirWithConsistentId, log, prepareSharedCtx(), ioFactory, keepBinary); } http://git-wip-us.apache.org/repos/asf/ignite/blob/00770767/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java index c92d572..f1258a0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java @@ -100,14 +100,15 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator { * (BinaryObjects will be used instead) */ StandaloneWalRecordsIterator( - @NotNull final File walFilesDir, - @NotNull final IgniteLogger log, - @NotNull final GridCacheSharedContext sharedCtx, - @NotNull final FileIOFactory ioFactory, - final boolean keepBinary) throws IgniteCheckedException { + @NotNull File walFilesDir, + @NotNull IgniteLogger log, + @NotNull GridCacheSharedContext sharedCtx, + @NotNull FileIOFactory ioFactory, + boolean keepBinary + ) throws IgniteCheckedException { super(log, sharedCtx, - new RecordV1Serializer(sharedCtx, true), + FileWriteAheadLogManager.forVersion(sharedCtx, FileWriteAheadLogManager.LATEST_SERIALIZER_VERSION), ioFactory, BUF_SIZE); this.keepBinary = keepBinary; @@ -127,15 +128,15 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator { * @param walFiles Wal files. */ StandaloneWalRecordsIterator( - @NotNull final IgniteLogger log, - @NotNull final GridCacheSharedContext sharedCtx, - @NotNull final FileIOFactory ioFactory, - final boolean workDir, - final boolean keepBinary, - @NotNull final File... walFiles) throws IgniteCheckedException { + @NotNull IgniteLogger log, + @NotNull GridCacheSharedContext sharedCtx, + @NotNull FileIOFactory ioFactory, + boolean workDir, + boolean keepBinary, + @NotNull File... walFiles) throws IgniteCheckedException { super(log, sharedCtx, - new RecordV1Serializer(sharedCtx, true), + FileWriteAheadLogManager.forVersion(sharedCtx, FileWriteAheadLogManager.LATEST_SERIALIZER_VERSION), ioFactory, BUF_SIZE); http://git-wip-us.apache.org/repos/asf/ignite/blob/00770767/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java new file mode 100644 index 0000000..8b5e6ba --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java @@ -0,0 +1,1574 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal.serializer; + +import java.io.DataInput; +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.pagemem.FullPageId; +import org.apache.ignite.internal.pagemem.wal.record.CacheState; +import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord; +import org.apache.ignite.internal.pagemem.wal.record.DataEntry; +import org.apache.ignite.internal.pagemem.wal.record.DataRecord; +import org.apache.ignite.internal.pagemem.wal.record.LazyDataEntry; +import org.apache.ignite.internal.pagemem.wal.record.MemoryRecoveryRecord; +import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot; +import org.apache.ignite.internal.pagemem.wal.record.TxRecord; +import org.apache.ignite.internal.pagemem.wal.record.WALRecord; +import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertFragmentRecord; +import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertRecord; +import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageRemoveRecord; +import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageSetFreeListPageRecord; +import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageUpdateRecord; +import org.apache.ignite.internal.pagemem.wal.record.delta.FixCountRecord; +import org.apache.ignite.internal.pagemem.wal.record.delta.FixLeftmostChildRecord; +import org.apache.ignite.internal.pagemem.wal.record.delta.FixRemoveId; +import org.apache.ignite.internal.pagemem.wal.record.delta.InitNewPageRecord; +import org.apache.ignite.internal.pagemem.wal.record.delta.InnerReplaceRecord; +import org.apache.ignite.internal.pagemem.wal.record.delta.InsertRecord; +import org.apache.ignite.internal.pagemem.wal.record.delta.MergeRecord; +import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageAddRootRecord; +import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageCutRootRecord; +import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageInitRecord; +import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageInitRootInlineRecord; +import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageInitRootRecord; +import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateLastAllocatedIndex; +import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateLastSuccessfulFullSnapshotId; +import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateLastSuccessfulSnapshotId; +import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateNextSnapshotId; +import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdatePartitionDataRecord; +import org.apache.ignite.internal.pagemem.wal.record.delta.NewRootInitRecord; +import org.apache.ignite.internal.pagemem.wal.record.delta.PageListMetaResetCountRecord; +import org.apache.ignite.internal.pagemem.wal.record.delta.PagesListAddPageRecord; +import org.apache.ignite.internal.pagemem.wal.record.delta.PagesListInitNewPageRecord; +import org.apache.ignite.internal.pagemem.wal.record.delta.PagesListRemovePageRecord; +import org.apache.ignite.internal.pagemem.wal.record.delta.PagesListSetNextRecord; +import org.apache.ignite.internal.pagemem.wal.record.delta.PagesListSetPreviousRecord; +import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionDestroyRecord; +import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord; +import org.apache.ignite.internal.pagemem.wal.record.delta.RecycleRecord; +import org.apache.ignite.internal.pagemem.wal.record.delta.RemoveRecord; +import org.apache.ignite.internal.pagemem.wal.record.delta.ReplaceRecord; +import org.apache.ignite.internal.pagemem.wal.record.delta.SplitExistingPageRecord; +import org.apache.ignite.internal.pagemem.wal.record.delta.SplitForwardPageRecord; +import org.apache.ignite.internal.pagemem.wal.record.delta.TrackingPageDeltaRecord; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.CacheObjectContext; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheOperation; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInnerIO; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.CacheVersionIO; +import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer; +import org.apache.ignite.internal.processors.cache.persistence.wal.RecordDataSerializer; +import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; +import org.apache.ignite.internal.util.typedef.internal.U; + +import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.CRC_SIZE; + +/** + * Record data V1 serializer. + */ +public class RecordDataV1Serializer implements RecordDataSerializer { + /** Length of HEADER record data. */ + static final int HEADER_RECORD_DATA_SIZE = /*Magic*/8 + /*Version*/4; + + /** Cache shared context */ + private final GridCacheSharedContext cctx; + + /** Size of page used for PageMemory regions */ + private final int pageSize; + + /** Cache object processor to reading {@link DataEntry DataEntries} */ + private final IgniteCacheObjectProcessor co; + + /** Serializer of {@link TxRecord} records. */ + private TxRecordSerializer txRecordSerializer; + + /** + * @param cctx Cctx. + */ + public RecordDataV1Serializer(GridCacheSharedContext cctx) { + this.cctx = cctx; + this.txRecordSerializer = new TxRecordSerializer(cctx); + this.co = cctx.kernalContext().cacheObjects(); + this.pageSize = cctx.database().pageSize(); + } + + /** {@inheritDoc} */ + @Override public int size(WALRecord record) throws IgniteCheckedException { + switch (record.type()) { + case PAGE_RECORD: + assert record instanceof PageSnapshot; + + PageSnapshot pageRec = (PageSnapshot)record; + + return pageRec.pageData().length + 12; + + case CHECKPOINT_RECORD: + CheckpointRecord cpRec = (CheckpointRecord)record; + + assert cpRec.checkpointMark() == null || cpRec.checkpointMark() instanceof FileWALPointer : + "Invalid WAL record: " + cpRec; + + int cacheStatesSize = cacheStatesSize(cpRec.cacheGroupStates()); + + FileWALPointer walPtr = (FileWALPointer)cpRec.checkpointMark(); + + return 18 + cacheStatesSize + (walPtr == null ? 0 : 16); + + case META_PAGE_INIT: + return /*cache ID*/4 + /*page ID*/8 + /*ioType*/2 + /*ioVer*/2 + /*tree root*/8 + /*reuse root*/8; + + case PARTITION_META_PAGE_UPDATE_COUNTERS: + return /*cache ID*/4 + /*page ID*/8 + /*upd cntr*/8 + /*rmv id*/8 + /*part size*/4 + /*counters page id*/8 + /*state*/ 1 + + /*allocatedIdxCandidate*/ 4; + + case MEMORY_RECOVERY: + return 8; + + case PARTITION_DESTROY: + return /*cacheId*/4 + /*partId*/4; + + case DATA_RECORD: + DataRecord dataRec = (DataRecord)record; + + return 4 + dataSize(dataRec); + + case HEADER_RECORD: + return HEADER_RECORD_DATA_SIZE; + + case DATA_PAGE_INSERT_RECORD: + DataPageInsertRecord diRec = (DataPageInsertRecord)record; + + return 4 + 8 + 2 + diRec.payload().length; + + case DATA_PAGE_UPDATE_RECORD: + DataPageUpdateRecord uRec = (DataPageUpdateRecord)record; + + return 4 + 8 + 2 + 4 + + uRec.payload().length; + + case DATA_PAGE_INSERT_FRAGMENT_RECORD: + final DataPageInsertFragmentRecord difRec = (DataPageInsertFragmentRecord)record; + + return 4 + 8 + 8 + 4 + difRec.payloadSize(); + + case DATA_PAGE_REMOVE_RECORD: + return 4 + 8 + 1; + + case DATA_PAGE_SET_FREE_LIST_PAGE: + return 4 + 8 + 8; + + case INIT_NEW_PAGE_RECORD: + return 4 + 8 + 2 + 2 + 8; + + case BTREE_META_PAGE_INIT_ROOT: + return 4 + 8 + 8; + + case BTREE_META_PAGE_INIT_ROOT2: + return 4 + 8 + 8 + 2; + + case BTREE_META_PAGE_ADD_ROOT: + return 4 + 8 + 8; + + case BTREE_META_PAGE_CUT_ROOT: + return 4 + 8; + + case BTREE_INIT_NEW_ROOT: + NewRootInitRecord riRec = (NewRootInitRecord)record; + + return 4 + 8 + 8 + 2 + 2 + 8 + 8 + riRec.io().getItemSize(); + + case BTREE_PAGE_RECYCLE: + return 4 + 8 + 8; + + case BTREE_PAGE_INSERT: + InsertRecord inRec = (InsertRecord)record; + + return 4 + 8 + 2 + 2 + 2 + 8 + inRec.io().getItemSize(); + + case BTREE_FIX_LEFTMOST_CHILD: + return 4 + 8 + 8; + + case BTREE_FIX_COUNT: + return 4 + 8 + 2; + + case BTREE_PAGE_REPLACE: + ReplaceRecord rRec = (ReplaceRecord)record; + + return 4 + 8 + 2 + 2 + 2 + rRec.io().getItemSize(); + + case BTREE_PAGE_REMOVE: + return 4 + 8 + 2 + 2; + + case BTREE_PAGE_INNER_REPLACE: + return 4 + 8 + 2 + 8 + 2 + 8; + + case BTREE_FORWARD_PAGE_SPLIT: + return 4 + 8 + 8 + 2 + 2 + 8 + 2 + 2; + + case BTREE_EXISTING_PAGE_SPLIT: + return 4 + 8 + 2 + 8; + + case BTREE_PAGE_MERGE: + return 4 + 8 + 8 + 2 + 8 + 1; + + case BTREE_FIX_REMOVE_ID: + return 4 + 8 + 8; + + case PAGES_LIST_SET_NEXT: + return 4 + 8 + 8; + + case PAGES_LIST_SET_PREVIOUS: + return 4 + 8 + 8; + + case PAGES_LIST_INIT_NEW_PAGE: + return 4 + 8 + 4 + 4 + 8 + 8 + 8; + + case PAGES_LIST_ADD_PAGE: + return 4 + 8 + 8; + + case PAGES_LIST_REMOVE_PAGE: + return 4 + 8 + 8; + + case TRACKING_PAGE_DELTA: + return 4 + 8 + 8 + 8 + 8; + + case META_PAGE_UPDATE_LAST_SUCCESSFUL_SNAPSHOT_ID: + return 4 + 8 + 8 + 8; + + case META_PAGE_UPDATE_LAST_SUCCESSFUL_FULL_SNAPSHOT_ID: + return 4 + 8 + 8; + + case META_PAGE_UPDATE_NEXT_SNAPSHOT_ID: + return 4 + 8 + 8; + + case META_PAGE_UPDATE_LAST_ALLOCATED_INDEX: + return 4 + 8 + 4; + + case PART_META_UPDATE_STATE: + return /*cacheId*/ 4 + /*partId*/ 4 + /*State*/1 + /*Update Counter*/ 8; + + case PAGE_LIST_META_RESET_COUNT_RECORD: + return /*cacheId*/ 4 + /*pageId*/ 8; + + case SWITCH_SEGMENT_RECORD: + // CRC is not loaded for switch segment. + return -CRC_SIZE; + + case TX_RECORD: + return txRecordSerializer.sizeOfTxRecord((TxRecord)record); + + default: + throw new UnsupportedOperationException("Type: " + record.type()); + } + } + + /** {@inheritDoc} */ + @Override public WALRecord readRecord(WALRecord.RecordType type, ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException { + WALRecord res; + + switch (type) { + case PAGE_RECORD: + byte[] arr = new byte[pageSize]; + + int cacheId = in.readInt(); + long pageId = in.readLong(); + + in.readFully(arr); + + res = new PageSnapshot(new FullPageId(pageId, cacheId), arr); + + break; + + case CHECKPOINT_RECORD: + long msb = in.readLong(); + long lsb = in.readLong(); + boolean hasPtr = in.readByte() != 0; + int idx = hasPtr ? in.readInt() : 0; + int offset = hasPtr ? in.readInt() : 0; + int len = hasPtr ? in.readInt() : 0; + + Map states = readPartitionStates(in); + + boolean end = in.readByte() != 0; + + FileWALPointer walPtr = hasPtr ? new FileWALPointer(idx, offset, len) : null; + + CheckpointRecord cpRec = new CheckpointRecord(new UUID(msb, lsb), walPtr, end); + + cpRec.cacheGroupStates(states); + + res = cpRec; + + break; + + case META_PAGE_INIT: + cacheId = in.readInt(); + pageId = in.readLong(); + + int ioType = in.readUnsignedShort(); + int ioVer = in.readUnsignedShort(); + long treeRoot = in.readLong(); + long reuseListRoot = in.readLong(); + + res = new MetaPageInitRecord(cacheId, pageId, ioType, ioVer, treeRoot, reuseListRoot); + + break; + + case PARTITION_META_PAGE_UPDATE_COUNTERS: + cacheId = in.readInt(); + pageId = in.readLong(); + + long updCntr = in.readLong(); + long rmvId = in.readLong(); + int partSize = in.readInt(); + long countersPageId = in.readLong(); + byte state = in.readByte(); + int allocatedIdxCandidate = in.readInt(); + + res = new MetaPageUpdatePartitionDataRecord(cacheId, pageId, updCntr, rmvId, partSize, countersPageId, state, allocatedIdxCandidate); + + break; + + case MEMORY_RECOVERY: + long ts = in.readLong(); + + res = new MemoryRecoveryRecord(ts); + + break; + + case PARTITION_DESTROY: + cacheId = in.readInt(); + int partId = in.readInt(); + + res = new PartitionDestroyRecord(cacheId, partId); + + break; + + case DATA_RECORD: + int entryCnt = in.readInt(); + + List entries = new ArrayList<>(entryCnt); + + for (int i = 0; i < entryCnt; i++) + entries.add(readDataEntry(in)); + + res = new DataRecord(entries); + + break; + + case HEADER_RECORD: + long magic = in.readLong(); + + if (magic != HeaderRecord.MAGIC) + throw new EOFException("Magic is corrupted [exp=" + U.hexLong(HeaderRecord.MAGIC) + + ", actual=" + U.hexLong(magic) + ']'); + + int ver = in.readInt(); + + res = new HeaderRecord(ver); + + break; + + case DATA_PAGE_INSERT_RECORD: { + cacheId = in.readInt(); + pageId = in.readLong(); + + int size = in.readUnsignedShort(); + + in.ensure(size); + + byte[] payload = new byte[size]; + + in.readFully(payload); + + res = new DataPageInsertRecord(cacheId, pageId, payload); + + break; + } + + case DATA_PAGE_UPDATE_RECORD: { + cacheId = in.readInt(); + pageId = in.readLong(); + + int itemId = in.readInt(); + + int size = in.readUnsignedShort(); + + in.ensure(size); + + byte[] payload = new byte[size]; + + in.readFully(payload); + + res = new DataPageUpdateRecord(cacheId, pageId, itemId, payload); + + break; + } + + case DATA_PAGE_INSERT_FRAGMENT_RECORD: { + cacheId = in.readInt(); + pageId = in.readLong(); + + final long lastLink = in.readLong(); + final int payloadSize = in.readInt(); + + final byte[] payload = new byte[payloadSize]; + + in.readFully(payload); + + res = new DataPageInsertFragmentRecord(cacheId, pageId, payload, lastLink); + + break; + } + + case DATA_PAGE_REMOVE_RECORD: + cacheId = in.readInt(); + pageId = in.readLong(); + + int itemId = in.readUnsignedByte(); + + res = new DataPageRemoveRecord(cacheId, pageId, itemId); + + break; + + case DATA_PAGE_SET_FREE_LIST_PAGE: + cacheId = in.readInt(); + pageId = in.readLong(); + + long freeListPage = in.readLong(); + + res = new DataPageSetFreeListPageRecord(cacheId, pageId, freeListPage); + + break; + + case INIT_NEW_PAGE_RECORD: + cacheId = in.readInt(); + pageId = in.readLong(); + + ioType = in.readUnsignedShort(); + ioVer = in.readUnsignedShort(); + long virtualPageId = in.readLong(); + + res = new InitNewPageRecord(cacheId, pageId, ioType, ioVer, virtualPageId); + + break; + + case BTREE_META_PAGE_INIT_ROOT: + cacheId = in.readInt(); + pageId = in.readLong(); + + long rootId = in.readLong(); + + res = new MetaPageInitRootRecord(cacheId, pageId, rootId); + + break; + + case BTREE_META_PAGE_INIT_ROOT2: + cacheId = in.readInt(); + pageId = in.readLong(); + + long rootId2 = in.readLong(); + int inlineSize = in.readShort(); + + res = new MetaPageInitRootInlineRecord(cacheId, pageId, rootId2, inlineSize); + + break; + + case BTREE_META_PAGE_ADD_ROOT: + cacheId = in.readInt(); + pageId = in.readLong(); + + rootId = in.readLong(); + + res = new MetaPageAddRootRecord(cacheId, pageId, rootId); + + break; + + case BTREE_META_PAGE_CUT_ROOT: + cacheId = in.readInt(); + pageId = in.readLong(); + + res = new MetaPageCutRootRecord(cacheId, pageId); + + break; + + case BTREE_INIT_NEW_ROOT: + cacheId = in.readInt(); + pageId = in.readLong(); + + rootId = in.readLong(); + ioType = in.readUnsignedShort(); + ioVer = in.readUnsignedShort(); + long leftId = in.readLong(); + long rightId = in.readLong(); + + BPlusIO io = BPlusIO.getBPlusIO(ioType, ioVer); + + byte[] rowBytes = new byte[io.getItemSize()]; + + in.readFully(rowBytes); + + res = new NewRootInitRecord<>(cacheId, pageId, rootId, (BPlusInnerIO)io, leftId, rowBytes, rightId); + + break; + + case BTREE_PAGE_RECYCLE: + cacheId = in.readInt(); + pageId = in.readLong(); + + long newPageId = in.readLong(); + + res = new RecycleRecord(cacheId, pageId, newPageId); + + break; + + case BTREE_PAGE_INSERT: + cacheId = in.readInt(); + pageId = in.readLong(); + + ioType = in.readUnsignedShort(); + ioVer = in.readUnsignedShort(); + int itemIdx = in.readUnsignedShort(); + rightId = in.readLong(); + + io = BPlusIO.getBPlusIO(ioType, ioVer); + + rowBytes = new byte[io.getItemSize()]; + + in.readFully(rowBytes); + + res = new InsertRecord<>(cacheId, pageId, io, itemIdx, rowBytes, rightId); + + break; + + case BTREE_FIX_LEFTMOST_CHILD: + cacheId = in.readInt(); + pageId = in.readLong(); + + rightId = in.readLong(); + + res = new FixLeftmostChildRecord(cacheId, pageId, rightId); + + break; + + case BTREE_FIX_COUNT: + cacheId = in.readInt(); + pageId = in.readLong(); + + int cnt = in.readUnsignedShort(); + + res = new FixCountRecord(cacheId, pageId, cnt); + + break; + + case BTREE_PAGE_REPLACE: + cacheId = in.readInt(); + pageId = in.readLong(); + + ioType = in.readUnsignedShort(); + ioVer = in.readUnsignedShort(); + itemIdx = in.readUnsignedShort(); + + io = BPlusIO.getBPlusIO(ioType, ioVer); + + rowBytes = new byte[io.getItemSize()]; + + in.readFully(rowBytes); + + res = new ReplaceRecord<>(cacheId, pageId, io, rowBytes, itemIdx); + + break; + + case BTREE_PAGE_REMOVE: + cacheId = in.readInt(); + pageId = in.readLong(); + + itemIdx = in.readUnsignedShort(); + cnt = in.readUnsignedShort(); + + res = new RemoveRecord(cacheId, pageId, itemIdx, cnt); + + break; + + case BTREE_PAGE_INNER_REPLACE: + cacheId = in.readInt(); + pageId = in.readLong(); + + int dstIdx = in.readUnsignedShort(); + long srcPageId = in.readLong(); + int srcIdx = in.readUnsignedShort(); + rmvId = in.readLong(); + + res = new InnerReplaceRecord<>(cacheId, pageId, dstIdx, srcPageId, srcIdx, rmvId); + + break; + + case BTREE_FORWARD_PAGE_SPLIT: + cacheId = in.readInt(); + pageId = in.readLong(); + + long fwdId = in.readLong(); + ioType = in.readUnsignedShort(); + ioVer = in.readUnsignedShort(); + srcPageId = in.readLong(); + int mid = in.readUnsignedShort(); + cnt = in.readUnsignedShort(); + + res = new SplitForwardPageRecord(cacheId, pageId, fwdId, ioType, ioVer, srcPageId, mid, cnt); + + break; + + case BTREE_EXISTING_PAGE_SPLIT: + cacheId = in.readInt(); + pageId = in.readLong(); + + mid = in.readUnsignedShort(); + fwdId = in.readLong(); + + res = new SplitExistingPageRecord(cacheId, pageId, mid, fwdId); + + break; + + case BTREE_PAGE_MERGE: + cacheId = in.readInt(); + pageId = in.readLong(); + + long prntId = in.readLong(); + int prntIdx = in.readUnsignedShort(); + rightId = in.readLong(); + boolean emptyBranch = in.readBoolean(); + + res = new MergeRecord<>(cacheId, pageId, prntId, prntIdx, rightId, emptyBranch); + + break; + + case BTREE_FIX_REMOVE_ID: + cacheId = in.readInt(); + pageId = in.readLong(); + + rmvId = in.readLong(); + + res = new FixRemoveId(cacheId, pageId, rmvId); + + break; + + case PAGES_LIST_SET_NEXT: + cacheId = in.readInt(); + pageId = in.readLong(); + long nextPageId = in.readLong(); + + res = new PagesListSetNextRecord(cacheId, pageId, nextPageId); + + break; + + case PAGES_LIST_SET_PREVIOUS: + cacheId = in.readInt(); + pageId = in.readLong(); + long prevPageId = in.readLong(); + + res = new PagesListSetPreviousRecord(cacheId, pageId, prevPageId); + + break; + + case PAGES_LIST_INIT_NEW_PAGE: + cacheId = in.readInt(); + pageId = in.readLong(); + ioType = in.readInt(); + ioVer = in.readInt(); + newPageId = in.readLong(); + prevPageId = in.readLong(); + long addDataPageId = in.readLong(); + + res = new PagesListInitNewPageRecord(cacheId, pageId, ioType, ioVer, newPageId, prevPageId, addDataPageId); + + break; + + case PAGES_LIST_ADD_PAGE: + cacheId = in.readInt(); + pageId = in.readLong(); + long dataPageId = in.readLong(); + + res = new PagesListAddPageRecord(cacheId, pageId, dataPageId); + + break; + + case PAGES_LIST_REMOVE_PAGE: + cacheId = in.readInt(); + pageId = in.readLong(); + long rmvdPageId = in.readLong(); + + res = new PagesListRemovePageRecord(cacheId, pageId, rmvdPageId); + + break; + + case TRACKING_PAGE_DELTA: + cacheId = in.readInt(); + pageId = in.readLong(); + + long pageIdToMark = in.readLong(); + long nextSnapshotId0 = in.readLong(); + long lastSuccessfulSnapshotId0 = in.readLong(); + + res = new TrackingPageDeltaRecord(cacheId, pageId, pageIdToMark, nextSnapshotId0, lastSuccessfulSnapshotId0); + + break; + + case META_PAGE_UPDATE_NEXT_SNAPSHOT_ID: + cacheId = in.readInt(); + pageId = in.readLong(); + + long nextSnapshotId = in.readLong(); + + res = new MetaPageUpdateNextSnapshotId(cacheId, pageId, nextSnapshotId); + + break; + + case META_PAGE_UPDATE_LAST_SUCCESSFUL_FULL_SNAPSHOT_ID: + cacheId = in.readInt(); + pageId = in.readLong(); + + long lastSuccessfulFullSnapshotId = in.readLong(); + + res = new MetaPageUpdateLastSuccessfulFullSnapshotId(cacheId, pageId, lastSuccessfulFullSnapshotId); + + break; + + case META_PAGE_UPDATE_LAST_SUCCESSFUL_SNAPSHOT_ID: + cacheId = in.readInt(); + pageId = in.readLong(); + + long lastSuccessfulSnapshotId = in.readLong(); + long lastSuccessfulSnapshotTag = in.readLong(); + + res = new MetaPageUpdateLastSuccessfulSnapshotId(cacheId, pageId, lastSuccessfulSnapshotId, lastSuccessfulSnapshotTag); + + break; + + case META_PAGE_UPDATE_LAST_ALLOCATED_INDEX: + cacheId = in.readInt(); + pageId = in.readLong(); + + int lastAllocatedIdx = in.readInt(); + + res = new MetaPageUpdateLastAllocatedIndex(cacheId, pageId, lastAllocatedIdx); + + break; + + case PART_META_UPDATE_STATE: + cacheId = in.readInt(); + partId = in.readInt(); + + state = in.readByte(); + + long updateCounter = in.readLong(); + + res = new PartitionMetaStateRecord(cacheId, partId, GridDhtPartitionState.fromOrdinal(state), updateCounter); + + break; + + case PAGE_LIST_META_RESET_COUNT_RECORD: + cacheId = in.readInt(); + pageId = in.readLong(); + + res = new PageListMetaResetCountRecord(cacheId, pageId); + break; + + case SWITCH_SEGMENT_RECORD: + throw new EOFException("END OF SEGMENT"); + + case TX_RECORD: + res = txRecordSerializer.readTxRecord(in); + + break; + + default: + throw new UnsupportedOperationException("Type: " + type); + } + + return res; + } + + /** {@inheritDoc} */ + @Override public void writeRecord(WALRecord record, ByteBuffer buf) throws IgniteCheckedException { + switch (record.type()) { + case PAGE_RECORD: + PageSnapshot snap = (PageSnapshot)record; + + buf.putInt(snap.fullPageId().groupId()); + buf.putLong(snap.fullPageId().pageId()); + buf.put(snap.pageData()); + + break; + + case MEMORY_RECOVERY: + MemoryRecoveryRecord memoryRecoveryRecord = (MemoryRecoveryRecord)record; + + buf.putLong(memoryRecoveryRecord.time()); + + break; + + case PARTITION_DESTROY: + PartitionDestroyRecord partDestroy = (PartitionDestroyRecord)record; + + buf.putInt(partDestroy.groupId()); + buf.putInt(partDestroy.partitionId()); + + break; + + case META_PAGE_INIT: + MetaPageInitRecord updRootsRec = (MetaPageInitRecord)record; + + buf.putInt(updRootsRec.groupId()); + buf.putLong(updRootsRec.pageId()); + + buf.putShort((short)updRootsRec.ioType()); + buf.putShort((short)updRootsRec.ioVersion()); + buf.putLong(updRootsRec.treeRoot()); + buf.putLong(updRootsRec.reuseListRoot()); + + break; + + case PARTITION_META_PAGE_UPDATE_COUNTERS: + MetaPageUpdatePartitionDataRecord partDataRec = (MetaPageUpdatePartitionDataRecord)record; + + buf.putInt(partDataRec.groupId()); + buf.putLong(partDataRec.pageId()); + + buf.putLong(partDataRec.updateCounter()); + buf.putLong(partDataRec.globalRemoveId()); + buf.putInt(partDataRec.partitionSize()); + buf.putLong(partDataRec.countersPageId()); + buf.put(partDataRec.state()); + buf.putInt(partDataRec.allocatedIndexCandidate()); + + break; + + case CHECKPOINT_RECORD: + CheckpointRecord cpRec = (CheckpointRecord)record; + + assert cpRec.checkpointMark() == null || cpRec.checkpointMark() instanceof FileWALPointer : + "Invalid WAL record: " + cpRec; + + FileWALPointer walPtr = (FileWALPointer)cpRec.checkpointMark(); + UUID cpId = cpRec.checkpointId(); + + buf.putLong(cpId.getMostSignificantBits()); + buf.putLong(cpId.getLeastSignificantBits()); + + buf.put(walPtr == null ? (byte)0 : 1); + + if (walPtr != null) { + buf.putLong(walPtr.index()); + buf.putInt(walPtr.fileOffset()); + buf.putInt(walPtr.length()); + } + + putCacheStates(buf, cpRec.cacheGroupStates()); + + buf.put(cpRec.end() ? (byte)1 : 0); + + break; + + case DATA_RECORD: + DataRecord dataRec = (DataRecord)record; + + buf.putInt(dataRec.writeEntries().size()); + + for (DataEntry dataEntry : dataRec.writeEntries()) + putDataEntry(buf, dataEntry); + + break; + + case HEADER_RECORD: + buf.putLong(HeaderRecord.MAGIC); + + buf.putInt(((HeaderRecord)record).version()); + + break; + + case DATA_PAGE_INSERT_RECORD: + DataPageInsertRecord diRec = (DataPageInsertRecord)record; + + buf.putInt(diRec.groupId()); + buf.putLong(diRec.pageId()); + + buf.putShort((short)diRec.payload().length); + + buf.put(diRec.payload()); + + break; + + case DATA_PAGE_UPDATE_RECORD: + DataPageUpdateRecord uRec = (DataPageUpdateRecord)record; + + buf.putInt(uRec.groupId()); + buf.putLong(uRec.pageId()); + buf.putInt(uRec.itemId()); + + buf.putShort((short)uRec.payload().length); + + buf.put(uRec.payload()); + + break; + + case DATA_PAGE_INSERT_FRAGMENT_RECORD: + final DataPageInsertFragmentRecord difRec = (DataPageInsertFragmentRecord)record; + + buf.putInt(difRec.groupId()); + buf.putLong(difRec.pageId()); + + buf.putLong(difRec.lastLink()); + buf.putInt(difRec.payloadSize()); + buf.put(difRec.payload()); + + break; + + case DATA_PAGE_REMOVE_RECORD: + DataPageRemoveRecord drRec = (DataPageRemoveRecord)record; + + buf.putInt(drRec.groupId()); + buf.putLong(drRec.pageId()); + + buf.put((byte)drRec.itemId()); + + break; + + case DATA_PAGE_SET_FREE_LIST_PAGE: + DataPageSetFreeListPageRecord freeListRec = (DataPageSetFreeListPageRecord)record; + + buf.putInt(freeListRec.groupId()); + buf.putLong(freeListRec.pageId()); + + buf.putLong(freeListRec.freeListPage()); + + break; + + case INIT_NEW_PAGE_RECORD: + InitNewPageRecord inpRec = (InitNewPageRecord)record; + + buf.putInt(inpRec.groupId()); + buf.putLong(inpRec.pageId()); + + buf.putShort((short)inpRec.ioType()); + buf.putShort((short)inpRec.ioVersion()); + buf.putLong(inpRec.newPageId()); + + break; + + case BTREE_META_PAGE_INIT_ROOT: + MetaPageInitRootRecord imRec = (MetaPageInitRootRecord)record; + + buf.putInt(imRec.groupId()); + buf.putLong(imRec.pageId()); + + buf.putLong(imRec.rootId()); + + break; + + case BTREE_META_PAGE_INIT_ROOT2: + MetaPageInitRootInlineRecord imRec2 = (MetaPageInitRootInlineRecord)record; + + buf.putInt(imRec2.groupId()); + buf.putLong(imRec2.pageId()); + + buf.putLong(imRec2.rootId()); + + buf.putShort((short)imRec2.inlineSize()); + break; + + case BTREE_META_PAGE_ADD_ROOT: + MetaPageAddRootRecord arRec = (MetaPageAddRootRecord)record; + + buf.putInt(arRec.groupId()); + buf.putLong(arRec.pageId()); + + buf.putLong(arRec.rootId()); + + break; + + case BTREE_META_PAGE_CUT_ROOT: + MetaPageCutRootRecord crRec = (MetaPageCutRootRecord)record; + + buf.putInt(crRec.groupId()); + buf.putLong(crRec.pageId()); + + break; + + case BTREE_INIT_NEW_ROOT: + NewRootInitRecord riRec = (NewRootInitRecord)record; + + buf.putInt(riRec.groupId()); + buf.putLong(riRec.pageId()); + + buf.putLong(riRec.rootId()); + buf.putShort((short)riRec.io().getType()); + buf.putShort((short)riRec.io().getVersion()); + buf.putLong(riRec.leftId()); + buf.putLong(riRec.rightId()); + + putRow(buf, riRec.rowBytes()); + + break; + + case BTREE_PAGE_RECYCLE: + RecycleRecord recRec = (RecycleRecord)record; + + buf.putInt(recRec.groupId()); + buf.putLong(recRec.pageId()); + + buf.putLong(recRec.newPageId()); + + break; + + case BTREE_PAGE_INSERT: + InsertRecord inRec = (InsertRecord)record; + + buf.putInt(inRec.groupId()); + buf.putLong(inRec.pageId()); + + buf.putShort((short)inRec.io().getType()); + buf.putShort((short)inRec.io().getVersion()); + buf.putShort((short)inRec.index()); + buf.putLong(inRec.rightId()); + + putRow(buf, inRec.rowBytes()); + + break; + + case BTREE_FIX_LEFTMOST_CHILD: + FixLeftmostChildRecord flRec = (FixLeftmostChildRecord)record; + + buf.putInt(flRec.groupId()); + buf.putLong(flRec.pageId()); + + buf.putLong(flRec.rightId()); + + break; + + case BTREE_FIX_COUNT: + FixCountRecord fcRec = (FixCountRecord)record; + + buf.putInt(fcRec.groupId()); + buf.putLong(fcRec.pageId()); + + buf.putShort((short)fcRec.count()); + + break; + + case BTREE_PAGE_REPLACE: + ReplaceRecord rRec = (ReplaceRecord)record; + + buf.putInt(rRec.groupId()); + buf.putLong(rRec.pageId()); + + buf.putShort((short)rRec.io().getType()); + buf.putShort((short)rRec.io().getVersion()); + buf.putShort((short)rRec.index()); + + putRow(buf, rRec.rowBytes()); + + break; + + case BTREE_PAGE_REMOVE: + RemoveRecord rmRec = (RemoveRecord)record; + + buf.putInt(rmRec.groupId()); + buf.putLong(rmRec.pageId()); + + buf.putShort((short)rmRec.index()); + buf.putShort((short)rmRec.count()); + + break; + + case BTREE_PAGE_INNER_REPLACE: + InnerReplaceRecord irRec = (InnerReplaceRecord)record; + + buf.putInt(irRec.groupId()); + buf.putLong(irRec.pageId()); + + buf.putShort((short)irRec.destinationIndex()); + buf.putLong(irRec.sourcePageId()); + buf.putShort((short)irRec.sourceIndex()); + buf.putLong(irRec.removeId()); + + break; + + case BTREE_FORWARD_PAGE_SPLIT: + SplitForwardPageRecord sfRec = (SplitForwardPageRecord)record; + + buf.putInt(sfRec.groupId()); + buf.putLong(sfRec.pageId()); + + buf.putLong(sfRec.forwardId()); + buf.putShort((short)sfRec.ioType()); + buf.putShort((short)sfRec.ioVersion()); + buf.putLong(sfRec.sourcePageId()); + buf.putShort((short)sfRec.middleIndex()); + buf.putShort((short)sfRec.count()); + + break; + + case BTREE_EXISTING_PAGE_SPLIT: + SplitExistingPageRecord seRec = (SplitExistingPageRecord)record; + + buf.putInt(seRec.groupId()); + buf.putLong(seRec.pageId()); + + buf.putShort((short)seRec.middleIndex()); + buf.putLong(seRec.forwardId()); + + break; + + case BTREE_PAGE_MERGE: + MergeRecord mRec = (MergeRecord)record; + + buf.putInt(mRec.groupId()); + buf.putLong(mRec.pageId()); + + buf.putLong(mRec.parentId()); + buf.putShort((short)mRec.parentIndex()); + buf.putLong(mRec.rightId()); + buf.put((byte)(mRec.isEmptyBranch() ? 1 : 0)); + + break; + + case PAGES_LIST_SET_NEXT: + PagesListSetNextRecord plNextRec = (PagesListSetNextRecord)record; + + buf.putInt(plNextRec.groupId()); + buf.putLong(plNextRec.pageId()); + + buf.putLong(plNextRec.nextPageId()); + + break; + + case PAGES_LIST_SET_PREVIOUS: + PagesListSetPreviousRecord plPrevRec = (PagesListSetPreviousRecord)record; + + buf.putInt(plPrevRec.groupId()); + buf.putLong(plPrevRec.pageId()); + + buf.putLong(plPrevRec.previousPageId()); + + break; + + case PAGES_LIST_INIT_NEW_PAGE: + PagesListInitNewPageRecord plNewRec = (PagesListInitNewPageRecord)record; + + buf.putInt(plNewRec.groupId()); + buf.putLong(plNewRec.pageId()); + buf.putInt(plNewRec.ioType()); + buf.putInt(plNewRec.ioVersion()); + buf.putLong(plNewRec.newPageId()); + + buf.putLong(plNewRec.previousPageId()); + buf.putLong(plNewRec.dataPageId()); + + break; + + case PAGES_LIST_ADD_PAGE: + PagesListAddPageRecord plAddRec = (PagesListAddPageRecord)record; + + buf.putInt(plAddRec.groupId()); + buf.putLong(plAddRec.pageId()); + + buf.putLong(plAddRec.dataPageId()); + + break; + + case PAGES_LIST_REMOVE_PAGE: + PagesListRemovePageRecord plRmvRec = (PagesListRemovePageRecord)record; + + buf.putInt(plRmvRec.groupId()); + buf.putLong(plRmvRec.pageId()); + + buf.putLong(plRmvRec.removedPageId()); + + break; + + case BTREE_FIX_REMOVE_ID: + FixRemoveId frRec = (FixRemoveId)record; + + buf.putInt(frRec.groupId()); + buf.putLong(frRec.pageId()); + + buf.putLong(frRec.removeId()); + + break; + + case TRACKING_PAGE_DELTA: + TrackingPageDeltaRecord tpDelta = (TrackingPageDeltaRecord)record; + + buf.putInt(tpDelta.groupId()); + buf.putLong(tpDelta.pageId()); + + buf.putLong(tpDelta.pageIdToMark()); + buf.putLong(tpDelta.nextSnapshotId()); + buf.putLong(tpDelta.lastSuccessfulSnapshotId()); + + break; + + case META_PAGE_UPDATE_NEXT_SNAPSHOT_ID: + MetaPageUpdateNextSnapshotId mpUpdateNextSnapshotId = (MetaPageUpdateNextSnapshotId)record; + + buf.putInt(mpUpdateNextSnapshotId.groupId()); + buf.putLong(mpUpdateNextSnapshotId.pageId()); + + buf.putLong(mpUpdateNextSnapshotId.nextSnapshotId()); + + break; + + case META_PAGE_UPDATE_LAST_SUCCESSFUL_FULL_SNAPSHOT_ID: + MetaPageUpdateLastSuccessfulFullSnapshotId mpUpdateLastSuccFullSnapshotId = + (MetaPageUpdateLastSuccessfulFullSnapshotId)record; + + buf.putInt(mpUpdateLastSuccFullSnapshotId.groupId()); + buf.putLong(mpUpdateLastSuccFullSnapshotId.pageId()); + + buf.putLong(mpUpdateLastSuccFullSnapshotId.lastSuccessfulFullSnapshotId()); + + break; + + case META_PAGE_UPDATE_LAST_SUCCESSFUL_SNAPSHOT_ID: + MetaPageUpdateLastSuccessfulSnapshotId mpUpdateLastSuccSnapshotId = + (MetaPageUpdateLastSuccessfulSnapshotId)record; + + buf.putInt(mpUpdateLastSuccSnapshotId.groupId()); + buf.putLong(mpUpdateLastSuccSnapshotId.pageId()); + + buf.putLong(mpUpdateLastSuccSnapshotId.lastSuccessfulSnapshotId()); + buf.putLong(mpUpdateLastSuccSnapshotId.lastSuccessfulSnapshotTag()); + + break; + + case META_PAGE_UPDATE_LAST_ALLOCATED_INDEX: + MetaPageUpdateLastAllocatedIndex mpUpdateLastAllocatedIdx = + (MetaPageUpdateLastAllocatedIndex) record; + + buf.putInt(mpUpdateLastAllocatedIdx.groupId()); + buf.putLong(mpUpdateLastAllocatedIdx.pageId()); + + buf.putInt(mpUpdateLastAllocatedIdx.lastAllocatedIndex()); + + break; + + case PART_META_UPDATE_STATE: + PartitionMetaStateRecord partMetaStateRecord = (PartitionMetaStateRecord) record; + + buf.putInt(partMetaStateRecord.groupId()); + + buf.putInt(partMetaStateRecord.partitionId()); + + buf.put(partMetaStateRecord.state()); + + buf.putLong(partMetaStateRecord.updateCounter()); + + break; + + case PAGE_LIST_META_RESET_COUNT_RECORD: + PageListMetaResetCountRecord pageListMetaResetCntRecord = (PageListMetaResetCountRecord) record; + + buf.putInt(pageListMetaResetCntRecord.groupId()); + buf.putLong(pageListMetaResetCntRecord.pageId()); + + break; + + case TX_RECORD: + txRecordSerializer.writeTxRecord((TxRecord)record, buf); + + break; + + case SWITCH_SEGMENT_RECORD: + break; + + default: + throw new UnsupportedOperationException("Type: " + record.type()); + } + } + + /** + * @param buf Buffer to write to. + * @param entry Data entry. + */ + private static void putDataEntry(ByteBuffer buf, DataEntry entry) throws IgniteCheckedException { + buf.putInt(entry.cacheId()); + + if (!entry.key().putValue(buf)) + throw new AssertionError(); + + if (entry.value() == null) + buf.putInt(-1); + else if (!entry.value().putValue(buf)) + throw new AssertionError(); + + buf.put((byte)entry.op().ordinal()); + + putVersion(buf, entry.nearXidVersion(), true); + putVersion(buf, entry.writeVersion(), false); + + buf.putInt(entry.partitionId()); + buf.putLong(entry.partitionCounter()); + buf.putLong(entry.expireTime()); + } + + /** + * @param states Cache states. + */ + private static void putCacheStates(ByteBuffer buf, Map states) { + buf.putShort((short)states.size()); + + for (Map.Entry entry : states.entrySet()) { + buf.putInt(entry.getKey()); + + CacheState state = entry.getValue(); + + // Need 2 bytes for the number of partitions. + buf.putShort((short)state.size()); + + for (int i = 0; i < state.size(); i++) { + buf.putShort((short)state.partitionByIndex(i)); + + buf.putLong(state.partitionSizeByIndex(i)); + buf.putLong(state.partitionCounterByIndex(i)); + } + } + } + + /** + * @param buf Buffer. + * @param ver Version to write. + * @param allowNull Is {@code null}version allowed. + */ + private static void putVersion(ByteBuffer buf, GridCacheVersion ver, boolean allowNull) { + CacheVersionIO.write(buf, ver, allowNull); + } + + /** + * @param buf Buffer. + * @param rowBytes Row bytes. + */ + @SuppressWarnings("unchecked") + private static void putRow(ByteBuffer buf, byte[] rowBytes) { + assert rowBytes.length > 0; + + buf.put(rowBytes); + } + + /** + * @param in Input to read from. + * @return Read entry. + */ + private DataEntry readDataEntry(ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException { + int cacheId = in.readInt(); + + int keySize = in.readInt(); + byte keyType = in.readByte(); + byte[] keyBytes = new byte[keySize]; + in.readFully(keyBytes); + + int valSize = in.readInt(); + + byte valType = 0; + byte[] valBytes = null; + + if (valSize >= 0) { + valType = in.readByte(); + valBytes = new byte[valSize]; + in.readFully(valBytes); + } + + byte ord = in.readByte(); + + GridCacheOperation op = GridCacheOperation.fromOrdinal(ord & 0xFF); + + GridCacheVersion nearXidVer = readVersion(in, true); + GridCacheVersion writeVer = readVersion(in, false); + + int partId = in.readInt(); + long partCntr = in.readLong(); + long expireTime = in.readLong(); + + GridCacheContext cacheCtx = cctx.cacheContext(cacheId); + + if (cacheCtx != null) { + CacheObjectContext coCtx = cacheCtx.cacheObjectContext(); + + KeyCacheObject key = co.toKeyCacheObject(coCtx, keyType, keyBytes); + CacheObject val = valBytes != null ? co.toCacheObject(coCtx, valType, valBytes) : null; + + return new DataEntry( + cacheId, + key, + val, + op, + nearXidVer, + writeVer, + expireTime, + partId, + partCntr + ); + } + else + return new LazyDataEntry( + cctx, + cacheId, + keyType, + keyBytes, + valType, + valBytes, + op, + nearXidVer, + writeVer, + expireTime, + partId, + partCntr); + } + + /** + * @param buf Buffer to read from. + * @return Read map. + */ + private Map readPartitionStates(DataInput buf) throws IOException { + int caches = buf.readShort() & 0xFFFF; + + if (caches == 0) + return Collections.emptyMap(); + + Map states = new HashMap<>(caches, 1.0f); + + for (int i = 0; i < caches; i++) { + int cacheId = buf.readInt(); + + int parts = buf.readShort() & 0xFFFF; + + CacheState state = new CacheState(parts); + + for (int p = 0; p < parts; p++) { + int partId = buf.readShort() & 0xFFFF; + long size = buf.readLong(); + long partCntr = buf.readLong(); + + state.addPartitionState(partId, size, partCntr); + } + + states.put(cacheId, state); + } + + return states; + } + + /** + * Changes the buffer position by the number of read bytes. + * + * @param in Data input to read from. + * @param allowNull Is {@code null}version allowed. + * @return Read cache version. + */ + private GridCacheVersion readVersion(ByteBufferBackedDataInput in, boolean allowNull) throws IOException { + // To be able to read serialization protocol version. + in.ensure(1); + + try { + int size = CacheVersionIO.readSize(in.buffer(), allowNull); + + in.ensure(size); + + return CacheVersionIO.read(in.buffer(), allowNull); + } + catch (IgniteCheckedException e) { + throw new IOException(e); + } + } + + /** + * @param dataRec Data record to serialize. + * @return Full data record size. + * @throws IgniteCheckedException If failed to obtain the length of one of the entries. + */ + private int dataSize(DataRecord dataRec) throws IgniteCheckedException { + int sz = 0; + + for (DataEntry entry : dataRec.writeEntries()) + sz += entrySize(entry); + + return sz; + } + + /** + * @param entry Entry to get size for. + * @return Entry size. + * @throws IgniteCheckedException If failed to get key or value bytes length. + */ + private int entrySize(DataEntry entry) throws IgniteCheckedException { + GridCacheContext cctx = this.cctx.cacheContext(entry.cacheId()); + CacheObjectContext coCtx = cctx.cacheObjectContext(); + + return + /*cache ID*/4 + + /*key*/entry.key().valueBytesLength(coCtx) + + /*value*/(entry.value() == null ? 4 : entry.value().valueBytesLength(coCtx)) + + /*op*/1 + + /*near xid ver*/CacheVersionIO.size(entry.nearXidVersion(), true) + + /*write ver*/CacheVersionIO.size(entry.writeVersion(), false) + + /*part ID*/4 + + /*expire Time*/8 + + /*part cnt*/8; + } + + /** + * @param states Partition states. + * @return Size required to write partition states. + */ + private int cacheStatesSize(Map states) { + // Need 4 bytes for the number of caches. + int size = 2; + + for (Map.Entry entry : states.entrySet()) { + // Cache ID. + size += 4; + + // Need 2 bytes for the number of partitions. + size += 2; + + CacheState state = entry.getValue(); + + // 2 bytes partition ID, size and counter per partition. + size += 18 * state.size(); + } + + return size; + } + +}