ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ag...@apache.org
Subject [1/2] ignite git commit: ignite-6029 IGNITE-6029 Record serializer refactoring and initial stuff for Record V2 serialization.
Date Wed, 27 Sep 2017 17:52:38 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-6029-master [created] 721df25fb


http://git-wip-us.apache.org/repos/asf/ignite/blob/721df25f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
new file mode 100644
index 0000000..2b55c5f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.serializer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput;
+import org.apache.ignite.internal.processors.cache.persistence.wal.RecordDataSerializer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord;
+
+/**
+ * Record data V2 serializer.
+ */
+public class RecordDataV2Serializer implements RecordDataSerializer {
+    /** V1 data serializer delegate. */
+    private final RecordDataV1Serializer delegateSerializer;
+
+    /**
+     * Create an instance of V2 data serializer.
+     *
+     * @param delegateSerializer V1 data serializer.
+     */
+    public RecordDataV2Serializer(RecordDataV1Serializer delegateSerializer) {
+        this.delegateSerializer = delegateSerializer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int size(WALRecord record) throws IgniteCheckedException {
+        if (record instanceof HeaderRecord)
+            throw new UnsupportedOperationException("Getting size of header records is forbidden since version 2 of serializer");
+
+        return delegateSerializer.size(record);
+    }
+
+    /** {@inheritDoc} */
+    @Override public WALRecord readRecord(WALRecord.RecordType type, ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException {
+        return delegateSerializer.readRecord(type, in);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeRecord(WALRecord record, ByteBuffer buf) throws IgniteCheckedException {
+        if (record instanceof HeaderRecord)
+            throw new UnsupportedOperationException("Writing header records is forbidden since version 2 of serializer");
+
+        delegateSerializer.writeRecord(record, buf);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/721df25f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
index ce6fdc7..c4e1bf2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
@@ -21,84 +21,22 @@ import java.io.DataInput;
 import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteSystemProperties;
-import org.apache.ignite.internal.pagemem.FullPageId;
 import org.apache.ignite.internal.pagemem.wal.WALPointer;
-import org.apache.ignite.internal.pagemem.wal.record.CacheState;
-import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
-import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
-import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
-import org.apache.ignite.internal.pagemem.wal.record.LazyDataEntry;
-import org.apache.ignite.internal.pagemem.wal.record.MemoryRecoveryRecord;
-import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
-import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType;
-import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertFragmentRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageRemoveRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageSetFreeListPageRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageUpdateRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.FixCountRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.FixLeftmostChildRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.FixRemoveId;
-import org.apache.ignite.internal.pagemem.wal.record.delta.InitNewPageRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.InnerReplaceRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.InsertRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.MergeRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageAddRootRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageCutRootRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageInitRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageInitRootInlineRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageInitRootRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateLastAllocatedIndex;
-import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateLastSuccessfulFullSnapshotId;
-import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateLastSuccessfulSnapshotId;
-import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateNextSnapshotId;
-import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdatePartitionDataRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.NewRootInitRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.PageListMetaResetCountRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.PagesListAddPageRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.PagesListInitNewPageRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.PagesListRemovePageRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.PagesListSetNextRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.PagesListSetPreviousRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionDestroyRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.RecycleRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.RemoveRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.ReplaceRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.SplitExistingPageRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.SplitForwardPageRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.TrackingPageDeltaRecord;
-import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.CacheObjectContext;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheOperation;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInnerIO;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.CacheVersionIO;
 import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput;
 import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput;
 import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
 import org.apache.ignite.internal.processors.cache.persistence.wal.RecordSerializer;
 import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentEofException;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WalSegmentTailReachedException;
 import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32;
-import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.io.RecordIO;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC;
 
@@ -116,1328 +54,97 @@ public class RecordV1Serializer implements RecordSerializer {
     /** Length of Type */
     public static final int REC_TYPE_SIZE = 1;
 
-    /** Length of WAL Pointer */
-    public static final int FILE_WAL_POINTER_SIZE = 12;
+    /** Length of WAL Pointer: Index (8) + File offset (4). */
+    public static final int FILE_WAL_POINTER_SIZE = 8 + 4;
 
     /** Length of CRC value */
-    private static final int CRC_SIZE = 4;
+    public static final int CRC_SIZE = 4;
 
-    /** */
-    public static final int HEADER_RECORD_SIZE = REC_TYPE_SIZE + FILE_WAL_POINTER_SIZE + /*Magic*/8 + /*Version*/4 + CRC_SIZE;
+    /** Total length of HEADER record. */
+    public static final int HEADER_RECORD_SIZE = REC_TYPE_SIZE + FILE_WAL_POINTER_SIZE + CRC_SIZE + RecordDataV1Serializer.HEADER_RECORD_DATA_SIZE;
 
-    /** Cache shared context */
-    private GridCacheSharedContext cctx;
-
-    /** Size of page used for PageMemory regions */
-    private int pageSize;
-
-    /** Cache object processor to reading {@link DataEntry DataEntries} */
-    private IgniteCacheObjectProcessor co;
-
-    /** Skip CRC calculation/check flag */
-    private boolean skipCrc = IgniteSystemProperties.getBoolean(IGNITE_PDS_SKIP_CRC, false);
-
-    /** Write pointer. */
-    private final boolean writePointer;
-
-    /** Serializer of {@link TxRecord} records. */
-    private TxRecordSerializer txRecordSerializer;
-
-    /**
-     * @param cctx Cache shared context.
-     */
-    public RecordV1Serializer(GridCacheSharedContext cctx) {
-        this(cctx, false);
-    }
-
-    /**
-     * @param cctx Cache shared context.
-     */
-    public RecordV1Serializer(GridCacheSharedContext cctx, boolean writePointer) {
-        this.cctx = cctx;
-        this.writePointer = writePointer;
-
-        co = cctx.kernalContext().cacheObjects();
-        pageSize = cctx.database().pageSize();
-        txRecordSerializer = new TxRecordSerializer(cctx);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int version() {
-        return 1;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writePointer() {
-        return writePointer;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("CastConflictsWithInstanceof")
-    @Override public void writeRecord(WALRecord record, ByteBuffer buf) throws IgniteCheckedException {
-        assert record.size() > 0 && buf.remaining() >= record.size() : record.size();
-
-        int startPos = buf.position();
-
-        buf.put((byte)(record.type().ordinal() + 1));
-
-        putPosition(buf, (FileWALPointer)record.position());
-
-        switch (record.type()) {
-            case PAGE_RECORD:
-                PageSnapshot snap = (PageSnapshot)record;
-
-                buf.putInt(snap.fullPageId().groupId());
-                buf.putLong(snap.fullPageId().pageId());
-                buf.put(snap.pageData());
-
-                break;
-
-            case MEMORY_RECOVERY:
-                MemoryRecoveryRecord memoryRecoveryRecord = (MemoryRecoveryRecord)record;
-
-                buf.putLong(memoryRecoveryRecord.time());
-
-                break;
-
-            case PARTITION_DESTROY:
-                PartitionDestroyRecord partDestroy = (PartitionDestroyRecord)record;
-
-                buf.putInt(partDestroy.groupId());
-                buf.putInt(partDestroy.partitionId());
-
-                break;
-
-            case META_PAGE_INIT:
-                MetaPageInitRecord updRootsRec = (MetaPageInitRecord)record;
-
-                buf.putInt(updRootsRec.groupId());
-                buf.putLong(updRootsRec.pageId());
-
-                buf.putShort((short)updRootsRec.ioType());
-                buf.putShort((short)updRootsRec.ioVersion());
-                buf.putLong(updRootsRec.treeRoot());
-                buf.putLong(updRootsRec.reuseListRoot());
-
-                break;
-
-            case PARTITION_META_PAGE_UPDATE_COUNTERS:
-                MetaPageUpdatePartitionDataRecord partDataRec = (MetaPageUpdatePartitionDataRecord)record;
-
-                buf.putInt(partDataRec.groupId());
-                buf.putLong(partDataRec.pageId());
-
-                buf.putLong(partDataRec.updateCounter());
-                buf.putLong(partDataRec.globalRemoveId());
-                buf.putInt(partDataRec.partitionSize());
-                buf.putLong(partDataRec.countersPageId());
-                buf.put(partDataRec.state());
-                buf.putInt(partDataRec.allocatedIndexCandidate());
-
-                break;
-
-            case CHECKPOINT_RECORD:
-                CheckpointRecord cpRec = (CheckpointRecord)record;
-
-                assert cpRec.checkpointMark() == null || cpRec.checkpointMark() instanceof FileWALPointer :
-                    "Invalid WAL record: " + cpRec;
-
-                FileWALPointer walPtr = (FileWALPointer)cpRec.checkpointMark();
-                UUID cpId = cpRec.checkpointId();
-
-                buf.putLong(cpId.getMostSignificantBits());
-                buf.putLong(cpId.getLeastSignificantBits());
-
-                buf.put(walPtr == null ? (byte)0 : 1);
-
-                if (walPtr != null) {
-                    buf.putLong(walPtr.index());
-                    buf.putInt(walPtr.fileOffset());
-                    buf.putInt(walPtr.length());
-                }
-
-                putCacheStates(buf, cpRec.cacheGroupStates());
-
-                buf.put(cpRec.end() ? (byte)1 : 0);
-
-                break;
-
-            case DATA_RECORD:
-                DataRecord dataRec = (DataRecord)record;
-
-                buf.putInt(dataRec.writeEntries().size());
-
-                for (DataEntry dataEntry : dataRec.writeEntries())
-                    putDataEntry(buf, dataEntry);
-
-                break;
-
-            case HEADER_RECORD:
-                buf.putLong(HeaderRecord.MAGIC);
-
-                buf.putInt(((HeaderRecord)record).version());
-
-                break;
-
-            case DATA_PAGE_INSERT_RECORD:
-                DataPageInsertRecord diRec = (DataPageInsertRecord)record;
-
-                buf.putInt(diRec.groupId());
-                buf.putLong(diRec.pageId());
-
-                buf.putShort((short)diRec.payload().length);
-
-                buf.put(diRec.payload());
-
-                break;
-
-            case DATA_PAGE_UPDATE_RECORD:
-                DataPageUpdateRecord uRec = (DataPageUpdateRecord)record;
-
-                buf.putInt(uRec.groupId());
-                buf.putLong(uRec.pageId());
-                buf.putInt(uRec.itemId());
-
-                buf.putShort((short)uRec.payload().length);
-
-                buf.put(uRec.payload());
-
-                break;
-
-            case DATA_PAGE_INSERT_FRAGMENT_RECORD:
-                final DataPageInsertFragmentRecord difRec = (DataPageInsertFragmentRecord)record;
-
-                buf.putInt(difRec.groupId());
-                buf.putLong(difRec.pageId());
-
-                buf.putLong(difRec.lastLink());
-                buf.putInt(difRec.payloadSize());
-                buf.put(difRec.payload());
-
-                break;
-
-            case DATA_PAGE_REMOVE_RECORD:
-                DataPageRemoveRecord drRec = (DataPageRemoveRecord)record;
-
-                buf.putInt(drRec.groupId());
-                buf.putLong(drRec.pageId());
-
-                buf.put((byte)drRec.itemId());
-
-                break;
-
-            case DATA_PAGE_SET_FREE_LIST_PAGE:
-                DataPageSetFreeListPageRecord freeListRec = (DataPageSetFreeListPageRecord)record;
-
-                buf.putInt(freeListRec.groupId());
-                buf.putLong(freeListRec.pageId());
-
-                buf.putLong(freeListRec.freeListPage());
-
-                break;
-
-            case INIT_NEW_PAGE_RECORD:
-                InitNewPageRecord inpRec = (InitNewPageRecord)record;
-
-                buf.putInt(inpRec.groupId());
-                buf.putLong(inpRec.pageId());
-
-                buf.putShort((short)inpRec.ioType());
-                buf.putShort((short)inpRec.ioVersion());
-                buf.putLong(inpRec.newPageId());
-
-                break;
-
-            case BTREE_META_PAGE_INIT_ROOT:
-                MetaPageInitRootRecord imRec = (MetaPageInitRootRecord)record;
-
-                buf.putInt(imRec.groupId());
-                buf.putLong(imRec.pageId());
-
-                buf.putLong(imRec.rootId());
-
-                break;
-
-            case BTREE_META_PAGE_INIT_ROOT2:
-                MetaPageInitRootInlineRecord imRec2 = (MetaPageInitRootInlineRecord)record;
-
-                buf.putInt(imRec2.groupId());
-                buf.putLong(imRec2.pageId());
-
-                buf.putLong(imRec2.rootId());
-
-                buf.putShort((short)imRec2.inlineSize());
-                break;
-
-            case BTREE_META_PAGE_ADD_ROOT:
-                MetaPageAddRootRecord arRec = (MetaPageAddRootRecord)record;
-
-                buf.putInt(arRec.groupId());
-                buf.putLong(arRec.pageId());
-
-                buf.putLong(arRec.rootId());
-
-                break;
-
-            case BTREE_META_PAGE_CUT_ROOT:
-                MetaPageCutRootRecord crRec = (MetaPageCutRootRecord)record;
-
-                buf.putInt(crRec.groupId());
-                buf.putLong(crRec.pageId());
-
-                break;
-
-            case BTREE_INIT_NEW_ROOT:
-                NewRootInitRecord<?> riRec = (NewRootInitRecord<?>)record;
-
-                buf.putInt(riRec.groupId());
-                buf.putLong(riRec.pageId());
-
-                buf.putLong(riRec.rootId());
-                buf.putShort((short)riRec.io().getType());
-                buf.putShort((short)riRec.io().getVersion());
-                buf.putLong(riRec.leftId());
-                buf.putLong(riRec.rightId());
-
-                putRow(buf, riRec.rowBytes());
-
-                break;
-
-            case BTREE_PAGE_RECYCLE:
-                RecycleRecord recRec = (RecycleRecord)record;
-
-                buf.putInt(recRec.groupId());
-                buf.putLong(recRec.pageId());
-
-                buf.putLong(recRec.newPageId());
-
-                break;
-
-            case BTREE_PAGE_INSERT:
-                InsertRecord<?> inRec = (InsertRecord<?>)record;
-
-                buf.putInt(inRec.groupId());
-                buf.putLong(inRec.pageId());
-
-                buf.putShort((short)inRec.io().getType());
-                buf.putShort((short)inRec.io().getVersion());
-                buf.putShort((short)inRec.index());
-                buf.putLong(inRec.rightId());
-
-                putRow(buf, inRec.rowBytes());
-
-                break;
-
-            case BTREE_FIX_LEFTMOST_CHILD:
-                FixLeftmostChildRecord flRec = (FixLeftmostChildRecord)record;
-
-                buf.putInt(flRec.groupId());
-                buf.putLong(flRec.pageId());
-
-                buf.putLong(flRec.rightId());
-
-                break;
-
-            case BTREE_FIX_COUNT:
-                FixCountRecord fcRec = (FixCountRecord)record;
-
-                buf.putInt(fcRec.groupId());
-                buf.putLong(fcRec.pageId());
-
-                buf.putShort((short)fcRec.count());
-
-                break;
-
-            case BTREE_PAGE_REPLACE:
-                ReplaceRecord<?> rRec = (ReplaceRecord<?>)record;
-
-                buf.putInt(rRec.groupId());
-                buf.putLong(rRec.pageId());
-
-                buf.putShort((short)rRec.io().getType());
-                buf.putShort((short)rRec.io().getVersion());
-                buf.putShort((short)rRec.index());
-
-                putRow(buf, rRec.rowBytes());
-
-                break;
-
-            case BTREE_PAGE_REMOVE:
-                RemoveRecord rmRec = (RemoveRecord)record;
-
-                buf.putInt(rmRec.groupId());
-                buf.putLong(rmRec.pageId());
-
-                buf.putShort((short)rmRec.index());
-                buf.putShort((short)rmRec.count());
-
-                break;
-
-            case BTREE_PAGE_INNER_REPLACE:
-                InnerReplaceRecord<?> irRec = (InnerReplaceRecord<?>)record;
-
-                buf.putInt(irRec.groupId());
-                buf.putLong(irRec.pageId());
-
-                buf.putShort((short)irRec.destinationIndex());
-                buf.putLong(irRec.sourcePageId());
-                buf.putShort((short)irRec.sourceIndex());
-                buf.putLong(irRec.removeId());
-
-                break;
-
-            case BTREE_FORWARD_PAGE_SPLIT:
-                SplitForwardPageRecord sfRec = (SplitForwardPageRecord)record;
-
-                buf.putInt(sfRec.groupId());
-                buf.putLong(sfRec.pageId());
-
-                buf.putLong(sfRec.forwardId());
-                buf.putShort((short)sfRec.ioType());
-                buf.putShort((short)sfRec.ioVersion());
-                buf.putLong(sfRec.sourcePageId());
-                buf.putShort((short)sfRec.middleIndex());
-                buf.putShort((short)sfRec.count());
-
-                break;
-
-            case BTREE_EXISTING_PAGE_SPLIT:
-                SplitExistingPageRecord seRec = (SplitExistingPageRecord)record;
-
-                buf.putInt(seRec.groupId());
-                buf.putLong(seRec.pageId());
-
-                buf.putShort((short)seRec.middleIndex());
-                buf.putLong(seRec.forwardId());
-
-                break;
-
-            case BTREE_PAGE_MERGE:
-                MergeRecord<?> mRec = (MergeRecord<?>)record;
-
-                buf.putInt(mRec.groupId());
-                buf.putLong(mRec.pageId());
-
-                buf.putLong(mRec.parentId());
-                buf.putShort((short)mRec.parentIndex());
-                buf.putLong(mRec.rightId());
-                buf.put((byte)(mRec.isEmptyBranch() ? 1 : 0));
-
-                break;
-
-            case PAGES_LIST_SET_NEXT:
-                PagesListSetNextRecord plNextRec = (PagesListSetNextRecord)record;
-
-                buf.putInt(plNextRec.groupId());
-                buf.putLong(plNextRec.pageId());
-
-                buf.putLong(plNextRec.nextPageId());
-
-                break;
-
-            case PAGES_LIST_SET_PREVIOUS:
-                PagesListSetPreviousRecord plPrevRec = (PagesListSetPreviousRecord)record;
-
-                buf.putInt(plPrevRec.groupId());
-                buf.putLong(plPrevRec.pageId());
-
-                buf.putLong(plPrevRec.previousPageId());
-
-                break;
-
-            case PAGES_LIST_INIT_NEW_PAGE:
-                PagesListInitNewPageRecord plNewRec = (PagesListInitNewPageRecord)record;
-
-                buf.putInt(plNewRec.groupId());
-                buf.putLong(plNewRec.pageId());
-                buf.putInt(plNewRec.ioType());
-                buf.putInt(plNewRec.ioVersion());
-                buf.putLong(plNewRec.newPageId());
-
-                buf.putLong(plNewRec.previousPageId());
-                buf.putLong(plNewRec.dataPageId());
-
-                break;
-
-            case PAGES_LIST_ADD_PAGE:
-                PagesListAddPageRecord plAddRec = (PagesListAddPageRecord)record;
-
-                buf.putInt(plAddRec.groupId());
-                buf.putLong(plAddRec.pageId());
-
-                buf.putLong(plAddRec.dataPageId());
-
-                break;
-
-            case PAGES_LIST_REMOVE_PAGE:
-                PagesListRemovePageRecord plRmvRec = (PagesListRemovePageRecord)record;
-
-                buf.putInt(plRmvRec.groupId());
-                buf.putLong(plRmvRec.pageId());
-
-                buf.putLong(plRmvRec.removedPageId());
-
-                break;
-
-            case BTREE_FIX_REMOVE_ID:
-                FixRemoveId frRec = (FixRemoveId)record;
-
-                buf.putInt(frRec.groupId());
-                buf.putLong(frRec.pageId());
-
-                buf.putLong(frRec.removeId());
-
-                break;
-
-            case TRACKING_PAGE_DELTA:
-                TrackingPageDeltaRecord tpDelta = (TrackingPageDeltaRecord)record;
-
-                buf.putInt(tpDelta.groupId());
-                buf.putLong(tpDelta.pageId());
-
-                buf.putLong(tpDelta.pageIdToMark());
-                buf.putLong(tpDelta.nextSnapshotId());
-                buf.putLong(tpDelta.lastSuccessfulSnapshotId());
-
-                break;
-
-            case META_PAGE_UPDATE_NEXT_SNAPSHOT_ID:
-                MetaPageUpdateNextSnapshotId mpUpdateNextSnapshotId = (MetaPageUpdateNextSnapshotId)record;
-
-                buf.putInt(mpUpdateNextSnapshotId.groupId());
-                buf.putLong(mpUpdateNextSnapshotId.pageId());
-
-                buf.putLong(mpUpdateNextSnapshotId.nextSnapshotId());
-
-                break;
-
-            case META_PAGE_UPDATE_LAST_SUCCESSFUL_FULL_SNAPSHOT_ID:
-                MetaPageUpdateLastSuccessfulFullSnapshotId mpUpdateLastSuccFullSnapshotId =
-                    (MetaPageUpdateLastSuccessfulFullSnapshotId)record;
-
-                buf.putInt(mpUpdateLastSuccFullSnapshotId.groupId());
-                buf.putLong(mpUpdateLastSuccFullSnapshotId.pageId());
-
-                buf.putLong(mpUpdateLastSuccFullSnapshotId.lastSuccessfulFullSnapshotId());
-
-                break;
-
-            case META_PAGE_UPDATE_LAST_SUCCESSFUL_SNAPSHOT_ID:
-                MetaPageUpdateLastSuccessfulSnapshotId mpUpdateLastSuccSnapshotId =
-                    (MetaPageUpdateLastSuccessfulSnapshotId)record;
-
-                buf.putInt(mpUpdateLastSuccSnapshotId.groupId());
-                buf.putLong(mpUpdateLastSuccSnapshotId.pageId());
-
-                buf.putLong(mpUpdateLastSuccSnapshotId.lastSuccessfulSnapshotId());
-                buf.putLong(mpUpdateLastSuccSnapshotId.lastSuccessfulSnapshotTag());
-
-                break;
-
-            case META_PAGE_UPDATE_LAST_ALLOCATED_INDEX:
-                MetaPageUpdateLastAllocatedIndex mpUpdateLastAllocatedIdx =
-                        (MetaPageUpdateLastAllocatedIndex) record;
-
-                buf.putInt(mpUpdateLastAllocatedIdx.groupId());
-                buf.putLong(mpUpdateLastAllocatedIdx.pageId());
-
-                buf.putInt(mpUpdateLastAllocatedIdx.lastAllocatedIndex());
-
-                break;
-
-            case PART_META_UPDATE_STATE:
-                PartitionMetaStateRecord partMetaStateRecord = (PartitionMetaStateRecord) record;
-
-                buf.putInt(partMetaStateRecord.groupId());
-
-                buf.putInt(partMetaStateRecord.partitionId());
-
-                buf.put(partMetaStateRecord.state());
-
-                buf.putLong(partMetaStateRecord.updateCounter());
-
-                break;
-
-            case PAGE_LIST_META_RESET_COUNT_RECORD:
-                PageListMetaResetCountRecord pageListMetaResetCntRecord = (PageListMetaResetCountRecord) record;
-
-                buf.putInt(pageListMetaResetCntRecord.groupId());
-                buf.putLong(pageListMetaResetCntRecord.pageId());
-
-                break;
-
-            case TX_RECORD:
-                txRecordSerializer.writeTxRecord((TxRecord) record, buf);
-
-                break;
-
-            default:
-                throw new UnsupportedOperationException("Type: " + record.type());
-        }
-
-        if (!skipCrc) {
-            int curPos = buf.position();
-
-            buf.position(startPos);
-
-            // This call will move buffer position to the end of the record again.
-            int crcVal = PureJavaCrc32.calcCrc32(buf, curPos - startPos);
-
-            buf.putInt(crcVal);
-        }
-        else
-            buf.putInt(0);
-    }
-
-    /** {@inheritDoc} */
-    @Override public WALRecord readRecord(FileInput in0, WALPointer expPtr) throws  IOException, IgniteCheckedException {
-        long startPos = -1;
-
-        try (FileInput.Crc32CheckingFileInput in = in0.startRead(skipCrc)) {
-            startPos = in0.position();
-
-            WALRecord res = readRecord(in, expPtr);
-
-            assert res != null;
-
-            int size = (int) (in0.position() - startPos + CRC_SIZE);
-
-            res.size(size); // Account for CRC which will be read afterwards.
-
-            if (writePointer && expPtr instanceof FileWALPointer) {
-                FileWALPointer ptr = (FileWALPointer) expPtr;
-
-                res.position(new FileWALPointer(ptr.index(), ptr.fileOffset(), size));
-            }
-
-            return res;
-        }
-        catch (EOFException | SegmentEofException e) {
-            throw e;
-        }
-        catch (Exception e) {
-            throw new IgniteCheckedException("Failed to read WAL record at position: " + startPos, e);
-        }
-    }
-
-    /**
-     * Loads record from input, does not read CRC value
-     *
-     * @param in Input to read record from
-     * @param expPtr expected WAL pointer for record. Used to validate actual position against expected from the file
-     * @throws SegmentEofException if end of WAL segment reached
-     */
-    private WALRecord readRecord(ByteBufferBackedDataInput in, WALPointer expPtr) throws IOException, IgniteCheckedException {
-        int type = in.readUnsignedByte();
-
-        if (type == WALRecord.RecordType.STOP_ITERATION_RECORD_TYPE)
-            throw new SegmentEofException("Reached logical end of the segment", null);
-
-        FileWALPointer ptr = readPosition(in);
-
-        if (!F.eq(ptr, expPtr))
-            throw new SegmentEofException("WAL segment rollover detected (will end iteration) [expPtr=" + expPtr +
-                ", readPtr=" + ptr + ']', null);
-
-        RecordType recType = RecordType.fromOrdinal(type - 1);
-
-        if (recType == null)
-            throw new IOException("Unknown record type: " + type);
-
-        WALRecord res;
-
-        switch (recType) {
-            case PAGE_RECORD:
-                byte[] arr = new byte[pageSize];
-
-                int cacheId = in.readInt();
-                long pageId = in.readLong();
-
-                in.readFully(arr);
-
-                res = new PageSnapshot(new FullPageId(pageId, cacheId), arr);
-
-                break;
-
-            case CHECKPOINT_RECORD:
-                long msb = in.readLong();
-                long lsb = in.readLong();
-                boolean hasPtr = in.readByte() != 0;
-                int idx = hasPtr ? in.readInt() : 0;
-                int offset = hasPtr ? in.readInt() : 0;
-                int len = hasPtr ? in.readInt() : 0;
-
-                Map<Integer, CacheState> states = readPartitionStates(in);
-
-                boolean end = in.readByte() != 0;
-
-                FileWALPointer walPtr = hasPtr ? new FileWALPointer(idx, offset, len) : null;
-
-                CheckpointRecord cpRec = new CheckpointRecord(new UUID(msb, lsb), walPtr, end);
-
-                cpRec.cacheGroupStates(states);
-
-                res = cpRec;
-
-                break;
-
-            case META_PAGE_INIT:
-                cacheId = in.readInt();
-                pageId = in.readLong();
-
-                int ioType = in.readUnsignedShort();
-                int ioVer = in.readUnsignedShort();
-                long treeRoot = in.readLong();
-                long reuseListRoot = in.readLong();
-
-                res = new MetaPageInitRecord(cacheId, pageId, ioType, ioVer, treeRoot, reuseListRoot);
-
-                break;
-
-            case PARTITION_META_PAGE_UPDATE_COUNTERS:
-                cacheId = in.readInt();
-                pageId = in.readLong();
-
-                long updCntr = in.readLong();
-                long rmvId = in.readLong();
-                int partSize = in.readInt();
-                long countersPageId = in.readLong();
-                byte state = in.readByte();
-                int allocatedIdxCandidate = in.readInt();
-
-                res = new MetaPageUpdatePartitionDataRecord(cacheId, pageId, updCntr, rmvId, partSize, countersPageId, state, allocatedIdxCandidate);
-
-                break;
-
-            case MEMORY_RECOVERY:
-                long ts = in.readLong();
-
-                res = new MemoryRecoveryRecord(ts);
-
-                break;
-
-            case PARTITION_DESTROY:
-                cacheId = in.readInt();
-                int partId = in.readInt();
-
-                res = new PartitionDestroyRecord(cacheId, partId);
-
-                break;
-
-            case DATA_RECORD:
-                int entryCnt = in.readInt();
-
-                List<DataEntry> entries = new ArrayList<>(entryCnt);
-
-                for (int i = 0; i < entryCnt; i++)
-                    entries.add(readDataEntry(in));
-
-                res = new DataRecord(entries);
-
-                break;
-
-            case HEADER_RECORD:
-                long magic = in.readLong();
-
-                if (magic != HeaderRecord.MAGIC)
-                    throw new EOFException("Magic is corrupted [exp=" + U.hexLong(HeaderRecord.MAGIC) +
-                        ", actual=" + U.hexLong(magic) + ']');
-
-                int ver = in.readInt();
-
-                res = new HeaderRecord(ver);
-
-                break;
-
-            case DATA_PAGE_INSERT_RECORD: {
-                cacheId = in.readInt();
-                pageId = in.readLong();
-
-                int size = in.readUnsignedShort();
-
-                in.ensure(size);
-
-                byte[] payload = new byte[size];
-
-                in.readFully(payload);
-
-                res = new DataPageInsertRecord(cacheId, pageId, payload);
-
-                break;
-            }
-
-            case DATA_PAGE_UPDATE_RECORD: {
-                cacheId = in.readInt();
-                pageId = in.readLong();
-
-                int itemId = in.readInt();
-
-                int size = in.readUnsignedShort();
-
-                in.ensure(size);
-
-                byte[] payload = new byte[size];
-
-                in.readFully(payload);
-
-                res = new DataPageUpdateRecord(cacheId, pageId, itemId, payload);
-
-                break;
-            }
-
-            case DATA_PAGE_INSERT_FRAGMENT_RECORD: {
-                cacheId = in.readInt();
-                pageId = in.readLong();
-
-                final long lastLink = in.readLong();
-                final int payloadSize = in.readInt();
-
-                final byte[] payload = new byte[payloadSize];
-
-                in.readFully(payload);
-
-                res = new DataPageInsertFragmentRecord(cacheId, pageId, payload, lastLink);
-
-                break;
-            }
-
-            case DATA_PAGE_REMOVE_RECORD:
-                cacheId = in.readInt();
-                pageId = in.readLong();
-
-                int itemId = in.readUnsignedByte();
-
-                res = new DataPageRemoveRecord(cacheId, pageId, itemId);
-
-                break;
-
-            case DATA_PAGE_SET_FREE_LIST_PAGE:
-                cacheId = in.readInt();
-                pageId = in.readLong();
-
-                long freeListPage = in.readLong();
-
-                res = new DataPageSetFreeListPageRecord(cacheId, pageId, freeListPage);
-
-                break;
-
-            case INIT_NEW_PAGE_RECORD:
-                cacheId = in.readInt();
-                pageId = in.readLong();
-
-                ioType = in.readUnsignedShort();
-                ioVer = in.readUnsignedShort();
-                long virtualPageId = in.readLong();
-
-                res = new InitNewPageRecord(cacheId, pageId, ioType, ioVer, virtualPageId);
-
-                break;
-
-            case BTREE_META_PAGE_INIT_ROOT:
-                cacheId = in.readInt();
-                pageId = in.readLong();
-
-                long rootId = in.readLong();
-
-                res = new MetaPageInitRootRecord(cacheId, pageId, rootId);
-
-                break;
-
-            case BTREE_META_PAGE_INIT_ROOT2:
-                cacheId = in.readInt();
-                pageId = in.readLong();
-
-                long rootId2 = in.readLong();
-                int inlineSize = in.readShort();
-
-                res = new MetaPageInitRootInlineRecord(cacheId, pageId, rootId2, inlineSize);
-
-                break;
-
-            case BTREE_META_PAGE_ADD_ROOT:
-                cacheId = in.readInt();
-                pageId = in.readLong();
-
-                rootId = in.readLong();
-
-                res = new MetaPageAddRootRecord(cacheId, pageId, rootId);
-
-                break;
-
-            case BTREE_META_PAGE_CUT_ROOT:
-                cacheId = in.readInt();
-                pageId = in.readLong();
-
-                res = new MetaPageCutRootRecord(cacheId, pageId);
-
-                break;
-
-            case BTREE_INIT_NEW_ROOT:
-                cacheId = in.readInt();
-                pageId = in.readLong();
-
-                rootId = in.readLong();
-                ioType = in.readUnsignedShort();
-                ioVer = in.readUnsignedShort();
-                long leftId = in.readLong();
-                long rightId = in.readLong();
-
-                BPlusIO<?> io = BPlusIO.getBPlusIO(ioType, ioVer);
-
-                byte[] rowBytes = new byte[io.getItemSize()];
-
-                in.readFully(rowBytes);
-
-                res = new NewRootInitRecord<>(cacheId, pageId, rootId, (BPlusInnerIO<?>)io, leftId, rowBytes, rightId);
-
-                break;
-
-            case BTREE_PAGE_RECYCLE:
-                cacheId = in.readInt();
-                pageId = in.readLong();
-
-                long newPageId = in.readLong();
-
-                res = new RecycleRecord(cacheId, pageId, newPageId);
-
-                break;
-
-            case BTREE_PAGE_INSERT:
-                cacheId = in.readInt();
-                pageId = in.readLong();
-
-                ioType = in.readUnsignedShort();
-                ioVer = in.readUnsignedShort();
-                int itemIdx = in.readUnsignedShort();
-                rightId = in.readLong();
-
-                io = BPlusIO.getBPlusIO(ioType, ioVer);
-
-                rowBytes = new byte[io.getItemSize()];
-
-                in.readFully(rowBytes);
-
-                res = new InsertRecord<>(cacheId, pageId, io, itemIdx, rowBytes, rightId);
-
-                break;
-
-            case BTREE_FIX_LEFTMOST_CHILD:
-                cacheId = in.readInt();
-                pageId = in.readLong();
-
-                rightId = in.readLong();
-
-                res = new FixLeftmostChildRecord(cacheId, pageId, rightId);
-
-                break;
-
-            case BTREE_FIX_COUNT:
-                cacheId = in.readInt();
-                pageId = in.readLong();
-
-                int cnt = in.readUnsignedShort();
-
-                res = new FixCountRecord(cacheId, pageId, cnt);
-
-                break;
-
-            case BTREE_PAGE_REPLACE:
-                cacheId = in.readInt();
-                pageId = in.readLong();
-
-                ioType = in.readUnsignedShort();
-                ioVer = in.readUnsignedShort();
-                itemIdx = in.readUnsignedShort();
-
-                io = BPlusIO.getBPlusIO(ioType, ioVer);
-
-                rowBytes = new byte[io.getItemSize()];
-
-                in.readFully(rowBytes);
-
-                res = new ReplaceRecord<>(cacheId, pageId, io, rowBytes, itemIdx);
-
-                break;
-
-            case BTREE_PAGE_REMOVE:
-                cacheId = in.readInt();
-                pageId = in.readLong();
-
-                itemIdx = in.readUnsignedShort();
-                cnt = in.readUnsignedShort();
-
-                res = new RemoveRecord(cacheId, pageId, itemIdx, cnt);
-
-                break;
-
-            case BTREE_PAGE_INNER_REPLACE:
-                cacheId = in.readInt();
-                pageId = in.readLong();
-
-                int dstIdx = in.readUnsignedShort();
-                long srcPageId = in.readLong();
-                int srcIdx = in.readUnsignedShort();
-                rmvId = in.readLong();
-
-                res = new InnerReplaceRecord<>(cacheId, pageId, dstIdx, srcPageId, srcIdx, rmvId);
-
-                break;
-
-            case BTREE_FORWARD_PAGE_SPLIT:
-                cacheId = in.readInt();
-                pageId = in.readLong();
-
-                long fwdId = in.readLong();
-                ioType = in.readUnsignedShort();
-                ioVer = in.readUnsignedShort();
-                srcPageId = in.readLong();
-                int mid = in.readUnsignedShort();
-                cnt = in.readUnsignedShort();
-
-                res = new SplitForwardPageRecord(cacheId, pageId, fwdId, ioType, ioVer, srcPageId, mid, cnt);
-
-                break;
-
-            case BTREE_EXISTING_PAGE_SPLIT:
-                cacheId = in.readInt();
-                pageId = in.readLong();
-
-                mid = in.readUnsignedShort();
-                fwdId = in.readLong();
-
-                res = new SplitExistingPageRecord(cacheId, pageId, mid, fwdId);
-
-                break;
-
-            case BTREE_PAGE_MERGE:
-                cacheId = in.readInt();
-                pageId = in.readLong();
-
-                long prntId = in.readLong();
-                int prntIdx = in.readUnsignedShort();
-                rightId = in.readLong();
-                boolean emptyBranch = in.readBoolean();
-
-                res = new MergeRecord<>(cacheId, pageId, prntId, prntIdx, rightId, emptyBranch);
-
-                break;
-
-            case BTREE_FIX_REMOVE_ID:
-                cacheId = in.readInt();
-                pageId = in.readLong();
-
-                rmvId = in.readLong();
-
-                res = new FixRemoveId(cacheId, pageId, rmvId);
-
-                break;
-
-            case PAGES_LIST_SET_NEXT:
-                cacheId = in.readInt();
-                pageId = in.readLong();
-                long nextPageId = in.readLong();
-
-                res = new PagesListSetNextRecord(cacheId, pageId, nextPageId);
-
-                break;
-
-            case PAGES_LIST_SET_PREVIOUS:
-                cacheId = in.readInt();
-                pageId = in.readLong();
-                long prevPageId = in.readLong();
-
-                res = new PagesListSetPreviousRecord(cacheId, pageId, prevPageId);
-
-                break;
-
-            case PAGES_LIST_INIT_NEW_PAGE:
-                cacheId = in.readInt();
-                pageId = in.readLong();
-                ioType = in.readInt();
-                ioVer = in.readInt();
-                newPageId = in.readLong();
-                prevPageId = in.readLong();
-                long addDataPageId = in.readLong();
-
-                res = new PagesListInitNewPageRecord(cacheId, pageId, ioType, ioVer, newPageId, prevPageId, addDataPageId);
-
-                break;
-
-            case PAGES_LIST_ADD_PAGE:
-                cacheId = in.readInt();
-                pageId = in.readLong();
-                long dataPageId = in.readLong();
-
-                res = new PagesListAddPageRecord(cacheId, pageId, dataPageId);
-
-                break;
-
-            case PAGES_LIST_REMOVE_PAGE:
-                cacheId = in.readInt();
-                pageId = in.readLong();
-                long rmvdPageId = in.readLong();
-
-                res = new PagesListRemovePageRecord(cacheId, pageId, rmvdPageId);
-
-                break;
-
-            case TRACKING_PAGE_DELTA:
-                cacheId = in.readInt();
-                pageId = in.readLong();
-
-                long pageIdToMark = in.readLong();
-                long nextSnapshotId0 = in.readLong();
-                long lastSuccessfulSnapshotId0 = in.readLong();
-
-                res = new TrackingPageDeltaRecord(cacheId, pageId, pageIdToMark, nextSnapshotId0, lastSuccessfulSnapshotId0);
-
-                break;
-
-            case META_PAGE_UPDATE_NEXT_SNAPSHOT_ID:
-                cacheId = in.readInt();
-                pageId = in.readLong();
-
-                long nextSnapshotId = in.readLong();
-
-                res = new MetaPageUpdateNextSnapshotId(cacheId, pageId, nextSnapshotId);
-
-                break;
-
-            case META_PAGE_UPDATE_LAST_SUCCESSFUL_FULL_SNAPSHOT_ID:
-                cacheId = in.readInt();
-                pageId = in.readLong();
-
-                long lastSuccessfulFullSnapshotId = in.readLong();
-
-                res = new MetaPageUpdateLastSuccessfulFullSnapshotId(cacheId, pageId, lastSuccessfulFullSnapshotId);
-
-                break;
-
-            case META_PAGE_UPDATE_LAST_SUCCESSFUL_SNAPSHOT_ID:
-                cacheId = in.readInt();
-                pageId = in.readLong();
-
-                long lastSuccessfulSnapshotId = in.readLong();
-                long lastSuccessfulSnapshotTag = in.readLong();
-
-                res = new MetaPageUpdateLastSuccessfulSnapshotId(cacheId, pageId, lastSuccessfulSnapshotId, lastSuccessfulSnapshotTag);
+    /** Skip CRC calculation/check flag */
+    public static boolean SKIP_CRC = IgniteSystemProperties.getBoolean(IGNITE_PDS_SKIP_CRC, false);
 
-                break;
+    /** V1 data serializer. */
+    private final RecordDataV1Serializer dataSerializer;
 
-            case META_PAGE_UPDATE_LAST_ALLOCATED_INDEX:
-                cacheId = in.readInt();
-                pageId = in.readLong();
+    /** Write pointer. */
+    private final boolean writePointer;
 
-                int lastAllocatedIdx = in.readInt();
+    /** Record read/write functional interface. */
+    private final RecordIO recordIO = new RecordIO() {
 
-                res = new MetaPageUpdateLastAllocatedIndex(cacheId, pageId, lastAllocatedIdx);
+        /** {@inheritDoc} */
+        @Override public int sizeWithHeaders(WALRecord record) throws IgniteCheckedException {
+            return dataSerializer.size(record) + REC_TYPE_SIZE + FILE_WAL_POINTER_SIZE + CRC_SIZE;
+        }
 
-                break;
+        /** {@inheritDoc} */
+        @Override public WALRecord readWithHeaders(ByteBufferBackedDataInput in, WALPointer expPtr) throws IOException, IgniteCheckedException {
+            RecordType recType = readRecordType(in);
 
-            case PART_META_UPDATE_STATE:
-                cacheId = in.readInt();
-                partId = in.readInt();
+            if (recType == RecordType.SWITCH_SEGMENT_RECORD)
+                throw new SegmentEofException("Reached end of segment", null);
 
-                state = in.readByte();
+            FileWALPointer ptr = readPosition(in);
 
-                long updateCounter = in.readLong();
+            if (!F.eq(ptr, expPtr))
+                throw new SegmentEofException("WAL segment rollover detected (will end iteration) [expPtr=" + expPtr +
+                        ", readPtr=" + ptr + ']', null);
 
-                res = new PartitionMetaStateRecord(cacheId, partId, GridDhtPartitionState.fromOrdinal(state), updateCounter);
+            return dataSerializer.readRecord(recType, in);
+        }
 
-                break;
+        /** {@inheritDoc} */
+        @Override public void writeWithHeaders(WALRecord record, ByteBuffer buf) throws IgniteCheckedException {
+            // Write record type.
+            putRecordType(buf, record);
 
-            case PAGE_LIST_META_RESET_COUNT_RECORD:
-                cacheId = in.readInt();
-                pageId = in.readLong();
+            // Write record file position.
+            putPositionOfRecord(buf, record);
 
-                res = new PageListMetaResetCountRecord(cacheId, pageId);
-                break;
+            // Write record data.
+            dataSerializer.writeRecord(record, buf);
+        }
+    };
 
-            case SWITCH_SEGMENT_RECORD:
-                throw new EOFException("END OF SEGMENT");
+    /**
+     * Create an instance of V1 serializer.
+     *
+     * @param dataSerializer V1 data serializer.
+     * @param writePointer Write pointer.
+     */
+    public RecordV1Serializer(RecordDataV1Serializer dataSerializer, boolean writePointer) {
+        this.dataSerializer = dataSerializer;
+        this.writePointer = writePointer;
+    }
 
-            case TX_RECORD:
-                res = txRecordSerializer.readTxRecord(in);
+    /** {@inheritDoc} */
+    @Override public int version() {
+        return 1;
+    }
 
-                break;
+    /** {@inheritDoc} */
+    @Override public boolean writePointer() {
+        return writePointer;
+    }
 
-            default:
-                throw new UnsupportedOperationException("Type: " + recType);
-        }
+    /** {@inheritDoc} */
+    @SuppressWarnings("CastConflictsWithInstanceof")
+    @Override public void writeRecord(WALRecord record, ByteBuffer buf) throws IgniteCheckedException {
+        writeWithCrc(record, buf, recordIO);
+    }
 
-        return res;
+    /** {@inheritDoc} */
+    @Override public WALRecord readRecord(FileInput in0, WALPointer expPtr) throws  IOException, IgniteCheckedException {
+        return readWithCrc(in0, expPtr, recordIO);
     }
 
     /** {@inheritDoc} */
     @SuppressWarnings("CastConflictsWithInstanceof")
     @Override public int size(WALRecord record) throws IgniteCheckedException {
-        int commonFields = REC_TYPE_SIZE + FILE_WAL_POINTER_SIZE + CRC_SIZE;
-
-        switch (record.type()) {
-            case PAGE_RECORD:
-                assert record instanceof PageSnapshot;
-
-                PageSnapshot pageRec = (PageSnapshot)record;
-
-                return commonFields + pageRec.pageData().length + 12;
-
-            case CHECKPOINT_RECORD:
-                CheckpointRecord cpRec = (CheckpointRecord)record;
-
-                assert cpRec.checkpointMark() == null || cpRec.checkpointMark() instanceof FileWALPointer :
-                    "Invalid WAL record: " + cpRec;
-
-                int cacheStatesSize = cacheStatesSize(cpRec.cacheGroupStates());
-
-                FileWALPointer walPtr = (FileWALPointer)cpRec.checkpointMark();
-
-                return commonFields + 18 + cacheStatesSize + (walPtr == null ? 0 : 16);
-
-            case META_PAGE_INIT:
-                return commonFields + /*cache ID*/4 + /*page ID*/8 + /*ioType*/2  + /*ioVer*/2 +  /*tree root*/8 + /*reuse root*/8;
-
-            case PARTITION_META_PAGE_UPDATE_COUNTERS:
-                return commonFields + /*cache ID*/4 + /*page ID*/8 + /*upd cntr*/8 + /*rmv id*/8 + /*part size*/4 + /*counters page id*/8 + /*state*/ 1
-                    + /*allocatedIdxCandidate*/ 4;
-
-            case MEMORY_RECOVERY:
-                return commonFields + 8;
-
-            case PARTITION_DESTROY:
-                return commonFields + /*cacheId*/4 + /*partId*/4;
-
-            case DATA_RECORD:
-                DataRecord dataRec = (DataRecord)record;
-
-                return commonFields + 4 + dataSize(dataRec);
-
-            case HEADER_RECORD:
-                return HEADER_RECORD_SIZE;
-
-            case DATA_PAGE_INSERT_RECORD:
-                DataPageInsertRecord diRec = (DataPageInsertRecord)record;
-
-                return commonFields + 4 + 8 + 2 + diRec.payload().length;
-
-            case DATA_PAGE_UPDATE_RECORD:
-                DataPageUpdateRecord uRec = (DataPageUpdateRecord)record;
-
-                return commonFields + 4 + 8 + 2 + 4 +
-                    uRec.payload().length;
-
-            case DATA_PAGE_INSERT_FRAGMENT_RECORD:
-                final DataPageInsertFragmentRecord difRec = (DataPageInsertFragmentRecord)record;
-
-                return commonFields + 4 + 8 + 8 + 4 + difRec.payloadSize();
-
-            case DATA_PAGE_REMOVE_RECORD:
-                return commonFields + 4 + 8 + 1;
-
-            case DATA_PAGE_SET_FREE_LIST_PAGE:
-                return commonFields + 4 + 8 + 8;
-
-            case INIT_NEW_PAGE_RECORD:
-                return commonFields + 4 + 8 + 2 + 2 + 8;
-
-            case BTREE_META_PAGE_INIT_ROOT:
-                return commonFields + 4 + 8 + 8;
-
-            case BTREE_META_PAGE_INIT_ROOT2:
-                return commonFields + 4 + 8 + 8 + 2;
-
-            case BTREE_META_PAGE_ADD_ROOT:
-                return commonFields + 4 + 8 + 8;
-
-            case BTREE_META_PAGE_CUT_ROOT:
-                return commonFields + 4 + 8;
-
-            case BTREE_INIT_NEW_ROOT:
-                NewRootInitRecord<?> riRec = (NewRootInitRecord<?>)record;
-
-                return commonFields + 4 + 8 + 8 + 2 + 2 + 8 + 8 + riRec.io().getItemSize();
-
-            case BTREE_PAGE_RECYCLE:
-                return commonFields + 4 + 8 + 8;
-
-            case BTREE_PAGE_INSERT:
-                InsertRecord<?> inRec = (InsertRecord<?>)record;
-
-                return commonFields + 4 + 8 + 2 + 2 + 2 + 8 + inRec.io().getItemSize();
-
-            case BTREE_FIX_LEFTMOST_CHILD:
-                return commonFields + 4 + 8 + 8;
-
-            case BTREE_FIX_COUNT:
-                return commonFields + 4 + 8 + 2;
-
-            case BTREE_PAGE_REPLACE:
-                ReplaceRecord<?> rRec = (ReplaceRecord<?>)record;
-
-                return commonFields + 4 + 8 + 2 + 2 + 2 + rRec.io().getItemSize();
-
-            case BTREE_PAGE_REMOVE:
-                return commonFields + 4 + 8 + 2 + 2;
-
-            case BTREE_PAGE_INNER_REPLACE:
-                return commonFields + 4 + 8 + 2 + 8 + 2 + 8;
-
-            case BTREE_FORWARD_PAGE_SPLIT:
-                return commonFields + 4 + 8 + 8 + 2 + 2 + 8 + 2 + 2;
-
-            case BTREE_EXISTING_PAGE_SPLIT:
-                return commonFields + 4 + 8 + 2 + 8;
-
-            case BTREE_PAGE_MERGE:
-                return commonFields + 4 + 8 + 8 + 2 + 8 + 1;
-
-            case BTREE_FIX_REMOVE_ID:
-                return commonFields + 4 + 8 + 8;
-
-            case PAGES_LIST_SET_NEXT:
-                return commonFields + 4 + 8 + 8;
-
-            case PAGES_LIST_SET_PREVIOUS:
-                return commonFields + 4 + 8 + 8;
-
-            case PAGES_LIST_INIT_NEW_PAGE:
-                return commonFields + 4 + 8 + 4 + 4 + 8 + 8 + 8;
-
-            case PAGES_LIST_ADD_PAGE:
-                return commonFields + 4 + 8 + 8;
-
-            case PAGES_LIST_REMOVE_PAGE:
-                return commonFields + 4 + 8 + 8;
-
-            case TRACKING_PAGE_DELTA:
-                return commonFields + 4 + 8 + 8 + 8 + 8;
-
-            case META_PAGE_UPDATE_LAST_SUCCESSFUL_SNAPSHOT_ID:
-                return commonFields + 4 + 8 + 8 + 8;
-
-            case META_PAGE_UPDATE_LAST_SUCCESSFUL_FULL_SNAPSHOT_ID:
-                return commonFields + 4 + 8 + 8;
-
-            case META_PAGE_UPDATE_NEXT_SNAPSHOT_ID:
-                return commonFields + 4 + 8 + 8;
-
-            case META_PAGE_UPDATE_LAST_ALLOCATED_INDEX:
-                return commonFields + 4 + 8 + 4;
-
-            case PART_META_UPDATE_STATE:
-                return commonFields + /*cacheId*/ 4 + /*partId*/ 4 + /*State*/1 + /*Update Counter*/ 8;
-
-            case PAGE_LIST_META_RESET_COUNT_RECORD:
-                return commonFields + /*cacheId*/ 4 + /*pageId*/ 8;
-
-            case SWITCH_SEGMENT_RECORD:
-                return commonFields - CRC_SIZE; //CRC is not loaded for switch segment, exception is thrown instead
-
-            case TX_RECORD:
-                return commonFields + txRecordSerializer.sizeOfTxRecord((TxRecord) record);
-
-            default:
-                throw new UnsupportedOperationException("Type: " + record.type());
-        }
+        return recordIO.sizeWithHeaders(record);
     }
 
     /**
@@ -1463,213 +170,106 @@ public class RecordV1Serializer implements RecordSerializer {
     }
 
     /**
-     * @param dataRec Data record to serialize.
-     * @return Full data record size.
-     * @throws IgniteCheckedException If failed to obtain the length of one of the entries.
-     */
-    private int dataSize(DataRecord dataRec) throws IgniteCheckedException {
-        int sz = 0;
-
-        for (DataEntry entry : dataRec.writeEntries())
-            sz += entrySize(entry);
-
-        return sz;
-    }
-
-    /**
-     * @param entry Entry to get size for.
-     * @return Entry size.
-     * @throws IgniteCheckedException If failed to get key or value bytes length.
+     * Writes record file position to given {@code buf}.
+     *
+     * @param buf Buffer to write record file position.
+     * @param record WAL record.
      */
-    private int entrySize(DataEntry entry) throws IgniteCheckedException {
-        GridCacheContext cctx = this.cctx.cacheContext(entry.cacheId());
-        CacheObjectContext coCtx = cctx.cacheObjectContext();
-
-        return
-            /*cache ID*/4 +
-            /*key*/entry.key().valueBytesLength(coCtx) +
-            /*value*/(entry.value() == null ? 4 : entry.value().valueBytesLength(coCtx)) +
-            /*op*/1 +
-            /*near xid ver*/CacheVersionIO.size(entry.nearXidVersion(), true) +
-            /*write ver*/CacheVersionIO.size(entry.writeVersion(), false) +
-            /*part ID*/4 +
-            /*expire Time*/8 +
-            /*part cnt*/8;
+    public static void putPositionOfRecord(ByteBuffer buf, WALRecord record) {
+        putPosition(buf, (FileWALPointer) record.position());
     }
 
     /**
-     * @param states Partition states.
-     * @return Size required to write partition states.
+     * Writes record type to given {@code buf}.
+     *
+     * @param buf Buffer to write record type.
+     * @param record WAL record.
      */
-    private int cacheStatesSize(Map<Integer, CacheState> states) {
-        // Need 4 bytes for the number of caches.
-        int size = 2;
-
-        for (Map.Entry<Integer, CacheState> entry : states.entrySet()) {
-            // Cache ID.
-            size += 4;
-
-            // Need 2 bytes for the number of partitions.
-            size += 2;
-
-            CacheState state = entry.getValue();
-
-            // 2 bytes partition ID, size and counter per partition.
-            size += 18 * state.size();
-        }
-
-        return size;
+    public static void putRecordType(ByteBuffer buf, WALRecord record) {
+        buf.put((byte)(record.type().ordinal() + 1));
     }
 
     /**
-     * @param buf Buffer to write to.
-     * @param entry Data entry.
+     * Reads record type from given {@code in}.
+     *
+     * @param in Buffer to read record type.
+     * @return Record type.
+     * @throws IgniteCheckedException If logical end of segment is reached.
+     * @throws IOException In case of I/O problems.
      */
-    private void putDataEntry(ByteBuffer buf, DataEntry entry) throws IgniteCheckedException {
-        buf.putInt(entry.cacheId());
-
-        if (!entry.key().putValue(buf))
-            throw new AssertionError();
+    public static RecordType readRecordType(DataInput in) throws IgniteCheckedException, IOException {
+        int type = in.readUnsignedByte();
 
-        if (entry.value() == null)
-            buf.putInt(-1);
-        else if (!entry.value().putValue(buf))
-            throw new AssertionError();
+        if (type == WALRecord.RecordType.STOP_ITERATION_RECORD_TYPE)
+            throw new SegmentEofException("Reached logical end of the segment", null);
 
-        buf.put((byte)entry.op().ordinal());
+        RecordType recType = RecordType.fromOrdinal(type - 1);
 
-        putVersion(buf, entry.nearXidVersion(), true);
-        putVersion(buf, entry.writeVersion(), false);
+        if (recType == null)
+            throw new IOException("Unknown record type: " + type);
 
-        buf.putInt(entry.partitionId());
-        buf.putLong(entry.partitionCounter());
-        buf.putLong(entry.expireTime());
+        return recType;
     }
 
     /**
-     * @param states Cache states.
+     * Reads record from file {@code in0} and validates CRC of record.
+     *
+     * @param in0 File input.
+     * @param expPtr Expected WAL pointer for record. Used to validate actual position against expected from the file.
+     * @param reader Record reader I/O interface.
+     * @return WAL record.
+     * @throws EOFException In case of end of file.
+     * @throws IgniteCheckedException If it's unable to read record.
      */
-    private void putCacheStates(ByteBuffer buf, Map<Integer, CacheState> states) {
-        buf.putShort((short)states.size());
+    public static WALRecord readWithCrc(FileInput in0, WALPointer expPtr, RecordIO reader) throws EOFException, IgniteCheckedException {
+        long startPos = -1;
 
-        for (Map.Entry<Integer, CacheState> entry : states.entrySet()) {
-            buf.putInt(entry.getKey());
+        try (FileInput.Crc32CheckingFileInput in = in0.startRead(SKIP_CRC)) {
+            startPos = in0.position();
 
-            CacheState state = entry.getValue();
+            WALRecord res = reader.readWithHeaders(in, expPtr);
 
-            // Need 2 bytes for the number of partitions.
-            buf.putShort((short)state.size());
+            assert res != null;
 
-            for (int i = 0; i < state.size(); i++) {
-                buf.putShort((short)state.partitionByIndex(i));
+            res.size((int)(in0.position() - startPos + CRC_SIZE)); // Account for CRC which will be read afterwards.
 
-                buf.putLong(state.partitionSizeByIndex(i));
-                buf.putLong(state.partitionCounterByIndex(i));
-            }
+            return res;
         }
-    }
-
-    /**
-     * @param in Input to read from.
-     * @return Read entry.
-     */
-    private DataEntry readDataEntry(ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException {
-        int cacheId = in.readInt();
-
-        int keySize = in.readInt();
-        byte keyType = in.readByte();
-        byte[] keyBytes = new byte[keySize];
-        in.readFully(keyBytes);
-
-        int valSize = in.readInt();
-
-        byte valType = 0;
-        byte[] valBytes = null;
-
-        if (valSize >= 0) {
-            valType = in.readByte();
-            valBytes = new byte[valSize];
-            in.readFully(valBytes);
+        catch (EOFException | SegmentEofException | WalSegmentTailReachedException e) {
+            throw e;
         }
-
-        byte ord = in.readByte();
-
-        GridCacheOperation op = GridCacheOperation.fromOrdinal(ord & 0xFF);
-
-        GridCacheVersion nearXidVer = readVersion(in, true);
-        GridCacheVersion writeVer = readVersion(in, false);
-
-        int partId = in.readInt();
-        long partCntr = in.readLong();
-        long expireTime = in.readLong();
-
-        GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
-
-        if (cacheCtx != null) {
-            CacheObjectContext coCtx = cacheCtx.cacheObjectContext();
-
-            KeyCacheObject key = co.toKeyCacheObject(coCtx, keyType, keyBytes);
-            CacheObject val = valBytes != null ? co.toCacheObject(coCtx, valType, valBytes) : null;
-
-            return new DataEntry(
-                cacheId,
-                key,
-                val,
-                op,
-                nearXidVer,
-                writeVer,
-                expireTime,
-                partId,
-                partCntr
-            );
+        catch (Exception e) {
+            throw new IgniteCheckedException("Failed to read WAL record at position: " + startPos, e);
         }
-        else
-            return new LazyDataEntry(
-                cctx,
-                cacheId,
-                keyType,
-                keyBytes,
-                valType,
-                valBytes,
-                op,
-                nearXidVer,
-                writeVer,
-                expireTime,
-                partId,
-                partCntr);
     }
 
     /**
-     * @param buf Buffer to read from.
-     * @return Read map.
+     * Writes record with calculated CRC to buffer {@code buf}.
+     *
+     * @param record WAL record.
+     * @param buf Buffer to write.
+     * @param writer Record write I/O interface.
+     * @throws IgniteCheckedException If it's unable to write record.
      */
-    private Map<Integer, CacheState> readPartitionStates(DataInput buf) throws IOException {
-        int caches = buf.readShort() & 0xFFFF;
-
-        if (caches == 0)
-            return Collections.emptyMap();
+    public static void writeWithCrc(WALRecord record, ByteBuffer buf, RecordIO writer) throws IgniteCheckedException {
+        assert record.size() >= 0 && buf.remaining() >= record.size() : record.size();
 
-        Map<Integer, CacheState> states = new HashMap<>(caches, 1.0f);
-
-        for (int i = 0; i < caches; i++) {
-            int cacheId = buf.readInt();
+        int startPos = buf.position();
 
-            int parts = buf.readShort() & 0xFFFF;
+        writer.writeWithHeaders(record, buf);
 
-            CacheState state = new CacheState(parts);
+        if (!SKIP_CRC) {
+            int curPos = buf.position();
 
-            for (int p = 0; p < parts; p++) {
-                int partId = buf.readShort() & 0xFFFF;
-                long size = buf.readLong();
-                long partCntr = buf.readLong();
+            buf.position(startPos);
 
-                state.addPartitionState(partId, size, partCntr);
-            }
+            // This call will move buffer position to the end of the record again.
+            int crcVal = PureJavaCrc32.calcCrc32(buf, curPos - startPos);
 
-            states.put(cacheId, state);
+            buf.putInt(crcVal);
         }
-
-        return states;
+        else
+            buf.putInt(0);
     }
 
     /**
@@ -1703,15 +303,4 @@ public class RecordV1Serializer implements RecordSerializer {
             throw new IOException(e);
         }
     }
-
-    /**
-     * @param buf Buffer.
-     * @param rowBytes Row bytes.
-     */
-    @SuppressWarnings("unchecked")
-    private static void putRow(ByteBuffer buf, byte[] rowBytes) {
-        assert rowBytes.length > 0;
-
-        buf.put(rowBytes);
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/721df25f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
new file mode 100644
index 0000000..0a5bf01
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.serializer;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.RecordSerializer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentEofException;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WalSegmentTailReachedException;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.io.RecordIO;
+import org.apache.ignite.internal.util.typedef.F;
+
+import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.CRC_SIZE;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.REC_TYPE_SIZE;
+
+/**
+ * Record V2 serializer.
+ * Stores records in following format:
+ * <ul>
+ * <li>Record type from {@link WALRecord.RecordType#ordinal()} incremented by 1</li>
+ * <li>WAL pointer to double check consistency</li>
+ * <li>Record length</li>
+ * <li>Data</li>
+ * <li>CRC or zero padding</li>
+ * </ul>
+ */
+public class RecordV2Serializer implements RecordSerializer {
+    /** Length of WAL Pointer: Index (8) + File offset (4) + Record length (4) */
+    public static final int FILE_WAL_POINTER_SIZE = 8 + 4 + 4;
+
+    /** V2 data serializer. */
+    private final RecordDataV2Serializer dataSerializer;
+
+    /** Write pointer. */
+    private final boolean writePointer;
+
+    /** Record read/write functional interface. */
+    private final RecordIO recordIO = new RecordIO() {
+
+        /** {@inheritDoc} */
+        @Override public int sizeWithHeaders(WALRecord record) throws IgniteCheckedException {
+            return dataSerializer.size(record) + REC_TYPE_SIZE + FILE_WAL_POINTER_SIZE + CRC_SIZE;
+        }
+
+        /** {@inheritDoc} */
+        @Override public WALRecord readWithHeaders(
+            ByteBufferBackedDataInput in,
+            WALPointer expPtr
+        ) throws IOException, IgniteCheckedException {
+            WALRecord.RecordType recType = RecordV1Serializer.readRecordType(in);
+
+            if (recType == WALRecord.RecordType.SWITCH_SEGMENT_RECORD)
+                throw new SegmentEofException("Reached end of segment", null);
+
+            FileWALPointer ptr = readPositionAndCheckPoint(in, expPtr);
+
+            return dataSerializer.readRecord(recType, in);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeWithHeaders(
+            WALRecord record,
+            ByteBuffer buf
+        ) throws IgniteCheckedException {
+            // Write record type.
+            RecordV1Serializer.putRecordType(buf, record);
+
+            // Write record file position.
+            putPositionOfRecord(buf, record);
+
+            // Write record data.
+            dataSerializer.writeRecord(record, buf);
+        }
+    };
+
+    /**
+     * Create an instance of Record V2 serializer.
+     *
+     * @param dataSerializer V2 data serializer.
+     */
+    public RecordV2Serializer(RecordDataV2Serializer dataSerializer, boolean writePointer) {
+        this.dataSerializer = dataSerializer;
+        this.writePointer = writePointer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int version() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writePointer() {
+        return writePointer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int size(WALRecord record) throws IgniteCheckedException {
+        return recordIO.sizeWithHeaders(record);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeRecord(WALRecord record, ByteBuffer buf) throws IgniteCheckedException {
+        RecordV1Serializer.writeWithCrc(record, buf, recordIO);
+    }
+
+    /** {@inheritDoc} */
+    @Override public WALRecord readRecord(FileInput in, WALPointer expPtr) throws IOException, IgniteCheckedException {
+        return RecordV1Serializer.readWithCrc(in, expPtr, recordIO);
+    }
+
+    /**
+     * @param in Data input to read pointer from.
+     * @return Read file WAL pointer.
+     * @throws IOException If failed to write.
+     */
+    public static FileWALPointer readPositionAndCheckPoint(
+        DataInput in,
+        WALPointer expPtr
+    ) throws IgniteCheckedException, IOException {
+        long idx = in.readLong();
+        int fileOffset = in.readInt();
+        int length = in.readInt();
+
+        FileWALPointer p = (FileWALPointer)expPtr;
+
+        if (!F.eq(idx, p.index()) || !F.eq(fileOffset, p.fileOffset()))
+            throw new WalSegmentTailReachedException(
+                "WAL segment tail is reached. [ " +
+                        "Expected next state: {Index=" + p.index() + ",Offset=" + p.fileOffset() + "}, " +
+                        "Actual state : {Index=" + idx + ",Offset=" + fileOffset + "} ]", null);
+
+        return new FileWALPointer(idx, fileOffset, length);
+    }
+
+    /**
+     * Writes record file position to given {@code buf}.
+     *
+     * @param buf Buffer to write record file position.
+     * @param record WAL record.
+     */
+    public static void putPositionOfRecord(ByteBuffer buf, WALRecord record) {
+        FileWALPointer p = (FileWALPointer)record.position();
+
+        buf.putLong(p.index());
+        buf.putInt(p.fileOffset());
+        buf.putInt(record.size());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/721df25f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/io/RecordIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/io/RecordIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/io/RecordIO.java
new file mode 100644
index 0000000..d609e61
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/io/RecordIO.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.serializer.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput;
+
+/**
+ * Internal interface to provide size, read and write operations of WAL records
+ * including record header and data.
+ */
+public interface RecordIO {
+    /**
+     * Calculates and returns size of record data and headers.
+     *
+     * @param record WAL record.
+     * @return Size in bytes.
+     * @throws IgniteCheckedException If it's unable to calculate size of record.
+     */
+    int sizeWithHeaders(WALRecord record) throws IgniteCheckedException;
+
+    /**
+     * Reads record data with headers from {@code in}.
+     *
+     * @param in Buffer to read.
+     * @param expPtr Expected WAL pointer for record. Used to validate actual position against expected from the file.
+     * @return WAL record.
+     * @throws IOException In case of I/O problems.
+     * @throws IgniteCheckedException If it's unable to read record.
+     */
+    WALRecord readWithHeaders(ByteBufferBackedDataInput in, WALPointer expPtr) throws IOException, IgniteCheckedException;
+
+    /**
+     * Writes record data with headers to {@code buf}.
+     *
+     * @param record WAL record.
+     * @param buf Buffer to write.
+     * @throws IgniteCheckedException If it's unable to write record.
+     */
+    void writeWithHeaders(WALRecord record, ByteBuffer buf) throws IgniteCheckedException;
+}


Mime
View raw message