ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [3/5] ignite git commit: IGNITE-8685 Fixed switch segment record size - Fixes #4130.
Date Sat, 09 Jun 2018 20:28:03 GMT
IGNITE-8685 Fixed switch segment record size - Fixes #4130.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2ff1a7e2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2ff1a7e2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2ff1a7e2

Branch: refs/heads/ignite-2.6
Commit: 2ff1a7e2368359582704a2c586cbcdf4ddf0af4a
Parents: d519ef5
Author: Dmitriy Govorukhin <dmitriy.govorukhin@gmail.com>
Authored: Wed Jun 6 17:20:18 2018 +0300
Committer: Alexey Goncharuk <alexey.goncharuk@gmail.com>
Committed: Sat Jun 9 23:25:12 2018 +0300

----------------------------------------------------------------------
 .../wal/record/BaselineTopologyRecord.java      |  76 ----
 .../internal/pagemem/wal/record/WALRecord.java  |  12 +-
 .../wal/AbstractWalRecordsIterator.java         |  15 +-
 .../wal/reader/StandaloneGridKernalContext.java |   8 +-
 .../BaselineTopologyRecordSerializer.java       | 168 --------
 .../wal/serializer/RecordDataV2Serializer.java  |  16 -
 .../wal/serializer/RecordV1Serializer.java      |  28 +-
 .../wal/serializer/RecordV2Serializer.java      |  27 +-
 .../wal/IgniteWalIteratorSwitchSegmentTest.java | 386 +++++++++++++++++++
 .../ignite/testsuites/IgnitePdsTestSuite2.java  |   3 +
 10 files changed, 459 insertions(+), 280 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff1a7e2/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/BaselineTopologyRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/BaselineTopologyRecord.java
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/BaselineTopologyRecord.java
deleted file mode 100644
index 48b60b3..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/BaselineTopologyRecord.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.pagemem.wal.record;
-
-import java.util.Map;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-/**
- * Record for storing baseline topology compact node ID to consistent node ID mapping.
- */
-public class BaselineTopologyRecord extends WALRecord {
-    /** Id. */
-    private int id;
-
-    /** Compact ID to consistent ID mapping. */
-    private Map<Short, Object> mapping;
-
-    /**
-     * Default constructor.
-     */
-    private BaselineTopologyRecord() {
-        // No-op, used from factory methods.
-    }
-
-    /**
-     * @param id Baseline topology ID.
-     * @param mapping Compact ID to consistent ID mapping.
-     */
-    public BaselineTopologyRecord(int id, Map<Short, Object> mapping) {
-        this.id = id;
-        this.mapping = mapping;
-    }
-
-    /** {@inheritDoc} */
-    @Override public RecordType type() {
-        return RecordType.BASELINE_TOP_RECORD;
-    }
-
-    /**
-     * Returns baseline topology ID.
-     *
-     * @return Baseline topology ID.
-     */
-    public int id() {
-        return id;
-    }
-
-    /**
-     * Returns mapping.
-     *
-     * @return Compact ID to consistent ID mapping.
-     */
-    public Map<Short, Object> mapping() {
-        return mapping;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(BaselineTopologyRecord.class, this, "super", super.toString());
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff1a7e2/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
index 4fae179..87ba07d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
@@ -153,7 +153,13 @@ public abstract class WALRecord {
         /** Page list meta reset count record. */
         PAGE_LIST_META_RESET_COUNT_RECORD,
 
-        /** Switch segment record. */
+        /** Switch segment record.
+         *  Marker record for indicate end of segment.
+         *  If the next one record is written down exactly at the end of segment,
+         *  SWITCH_SEGMENT_RECORD will not be written, if not then it means that we have
more
+         *  that one byte in the end,then we write SWITCH_SEGMENT_RECORD as marker end of
segment.
+         *  No need write CRC or WAL pointer for this record. It is byte marker record.
+         *  */
         SWITCH_SEGMENT_RECORD,
 
         /** */
@@ -174,8 +180,8 @@ public abstract class WALRecord {
         /** Exchange record. */
         EXCHANGE,
 
-        /** Baseline topology record. */
-        BASELINE_TOP_RECORD;
+        /** Reserved for future record. */
+        RESERVED;
 
         /** */
         private static final RecordType[] VALS = RecordType.values();

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff1a7e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
index d9312f6..e442386 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
@@ -190,18 +190,20 @@ public abstract class AbstractWalRecordsIterator
     /**
      * Switches records iterator to the next WAL segment
      * as result of this method, new reference to segment should be returned.
-     * Null for current handle means stop of iteration
-     * @throws IgniteCheckedException if reading failed
+     * Null for current handle means stop of iteration.
+     *
      * @param curWalSegment current open WAL segment or null if there is no open segment
yet
      * @return new WAL segment to read or null for stop iteration
+     * @throws IgniteCheckedException if reading failed
      */
     protected abstract AbstractReadFileHandle advanceSegment(
         @Nullable final AbstractReadFileHandle curWalSegment) throws IgniteCheckedException;
 
     /**
-     * Switches to new record
-     * @param hnd currently opened read handle
-     * @return next advanced record
+     * Switches to new record.
+     *
+     * @param hnd currently opened read handle.
+     * @return next advanced record.
      */
     private IgniteBiTuple<WALPointer, WALRecord> advanceRecord(
         @Nullable final AbstractReadFileHandle hnd
@@ -242,7 +244,8 @@ public abstract class AbstractWalRecordsIterator
     }
 
     /**
-     * Handler for record deserialization exception
+     * Handler for record deserialization exception.
+     *
      * @param e problem from records reading
      * @param ptr file pointer was accessed
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff1a7e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
index cb04575..795d460 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -128,9 +128,11 @@ public class StandaloneGridKernalContext implements GridKernalContext
{
      * {@code null} means no specific folder is configured.
      * Providing {@code null} will disable unmarshall for non primitive objects, BinaryObjects
will be provided <br>
      */
-    StandaloneGridKernalContext(IgniteLogger log,
+    public StandaloneGridKernalContext(
+        IgniteLogger log,
         @Nullable File binaryMetadataFileStoreDir,
-        @Nullable File marshallerMappingFileStoreDir) throws IgniteCheckedException {
+        @Nullable File marshallerMappingFileStoreDir
+    ) throws IgniteCheckedException {
         this.log = log;
 
         try {
@@ -179,7 +181,7 @@ public class StandaloneGridKernalContext implements GridKernalContext
{
     /**
      * @return Ignite configuration which allows to start requied processors for WAL reader
      */
-    private IgniteConfiguration prepareIgniteConfiguration() {
+    protected IgniteConfiguration prepareIgniteConfiguration() {
         IgniteConfiguration cfg = new IgniteConfiguration();
 
         cfg.setDiscoverySpi(new StandaloneNoopDiscoverySpi());

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff1a7e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/BaselineTopologyRecordSerializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/BaselineTopologyRecordSerializer.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/BaselineTopologyRecordSerializer.java
deleted file mode 100644
index 94b51c5..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/BaselineTopologyRecordSerializer.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.persistence.wal.serializer;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Map;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.pagemem.wal.record.BaselineTopologyRecord;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- * {@link BaselineTopologyRecord} WAL serializer.
- */
-public class BaselineTopologyRecordSerializer {
-    /** Cache shared context. */
-    private GridCacheSharedContext cctx;
-
-    /** Class loader to unmarshal consistent IDs. */
-    private ClassLoader clsLdr;
-
-    /**
-     * Create an instance of serializer.
-     *
-     * @param cctx Cache shared context.
-     */
-    public BaselineTopologyRecordSerializer(GridCacheSharedContext cctx) {
-        this.cctx = cctx;
-
-        clsLdr = U.resolveClassLoader(cctx.gridConfig());
-    }
-
-    /**
-     * Writes {@link BaselineTopologyRecord} to given buffer.
-     *
-     * @param rec {@link BaselineTopologyRecord} instance.
-     * @param buf Byte buffer.
-     * @throws IgniteCheckedException In case of fail.
-     */
-    public void write(BaselineTopologyRecord rec, ByteBuffer buf) throws IgniteCheckedException
{
-        buf.putInt(rec.id());
-
-        Map<Short, Object> mapping = rec.mapping();
-
-        if (mapping != null && !mapping.isEmpty()) {
-            buf.putInt(mapping.size());
-
-            for (Map.Entry<Short, Object> e : mapping.entrySet()) {
-                buf.putShort(e.getKey());
-
-                writeConsistentId(e.getValue(), buf);
-            }
-        }
-        else
-            buf.putInt(0);
-    }
-
-    /**
-     * Reads {@link BaselineTopologyRecord} from given input.
-     *
-     * @param in Input
-     * @return BaselineTopologyRecord instance.
-     * @throws IOException In case of fail.
-     * @throws IgniteCheckedException In case of fail.
-     */
-    public BaselineTopologyRecord read(ByteBufferBackedDataInput in) throws IOException,
IgniteCheckedException {
-        int id = in.readInt();
-
-        int size = in.readInt();
-
-        Map<Short, Object> mapping = size > 0 ? U.<Short, Object>newHashMap(size)
: null;
-
-        for (int i = 0; i < size; i++) {
-            short compactId = in.readShort();
-
-            Object consistentId = readConsistentId(in);
-
-            mapping.put(compactId, consistentId);
-        }
-
-        return new BaselineTopologyRecord(id, mapping);
-    }
-
-    /**
-     * Returns size of marshalled {@link BaselineTopologyRecord} in bytes.
-     *
-     * @param rec BaselineTopologyRecord instance.
-     * @return Size of BaselineTopologyRecord instance in bytes.
-     * @throws IgniteCheckedException In case of fail.
-     */
-    public int size(BaselineTopologyRecord rec) throws IgniteCheckedException {
-        int size = 0;
-
-        size += /* Baseline topology ID. */ 4;
-
-        size += /* Consistent ID mapping size. */ 4;
-
-        if (rec.mapping() != null) {
-            for (Object consistentId : rec.mapping().values()) {
-                size += /* Compact ID size */ 2;
-
-                size += marshalConsistentId(consistentId).length;
-            }
-        }
-
-        return size;
-    }
-
-    /**
-     * Write consistent id to given buffer.
-     *
-     * @param consistentId Consistent id.
-     * @param buf Byte buffer.
-     * @throws IgniteCheckedException In case of fail.
-     */
-    private void writeConsistentId(Object consistentId, ByteBuffer buf) throws IgniteCheckedException
{
-        byte[] content = marshalConsistentId(consistentId);
-
-        buf.putInt(content.length);
-        buf.put(content);
-    }
-
-    /**
-     * Read consistent id from given input.
-     *
-     * @param in Input.
-     * @return Consistent id.
-     * @throws IOException In case of fail.
-     * @throws IgniteCheckedException In case of fail.
-     */
-    private Object readConsistentId(ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException
{
-        int len = in.readInt();
-        in.ensure(len);
-
-        byte[] content = new byte[len];
-        in.readFully(content);
-
-        return cctx.marshaller().unmarshal(content, clsLdr);
-    }
-
-    /**
-     * Marshal consistent id to byte array.
-     *
-     * @param consistentId Consistent id.
-     * @return Marshalled byte array.
-     * @throws IgniteCheckedException In case of fail.
-     */
-    private byte[] marshalConsistentId(Object consistentId) throws IgniteCheckedException
{
-        return cctx.marshaller().marshal(consistentId);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff1a7e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
index b3a00be..b760547 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
@@ -27,7 +27,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.pagemem.wal.record.BaselineTopologyRecord;
 import org.apache.ignite.internal.pagemem.wal.record.CacheState;
 import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
 import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
@@ -53,9 +52,6 @@ public class RecordDataV2Serializer implements RecordDataSerializer {
     /** Serializer of {@link TxRecord} records. */
     private final TxRecordSerializer txRecordSerializer;
 
-    /** Serializer of {@link BaselineTopologyRecord} records. */
-    private final BaselineTopologyRecordSerializer bltRecSerializer;
-
     /**
      * Create an instance of V2 data serializer.
      *
@@ -64,7 +60,6 @@ public class RecordDataV2Serializer implements RecordDataSerializer {
     public RecordDataV2Serializer(RecordDataV1Serializer delegateSerializer) {
         this.delegateSerializer = delegateSerializer;
         this.txRecordSerializer = new TxRecordSerializer();
-        this.bltRecSerializer = new BaselineTopologyRecordSerializer(delegateSerializer.cctx());
     }
 
     /** {@inheritDoc} */
@@ -97,9 +92,6 @@ public class RecordDataV2Serializer implements RecordDataSerializer {
             case TX_RECORD:
                 return txRecordSerializer.size((TxRecord)rec);
 
-            case BASELINE_TOP_RECORD:
-                return bltRecSerializer.size((BaselineTopologyRecord)rec);
-
             default:
                 return delegateSerializer.size(rec);
         }
@@ -157,9 +149,6 @@ public class RecordDataV2Serializer implements RecordDataSerializer {
             case TX_RECORD:
                 return txRecordSerializer.read(in);
 
-            case BASELINE_TOP_RECORD:
-                return bltRecSerializer.read(in);
-
             default:
                 return delegateSerializer.readRecord(type, in);
         }
@@ -231,11 +220,6 @@ public class RecordDataV2Serializer implements RecordDataSerializer {
 
                 break;
 
-            case BASELINE_TOP_RECORD:
-                bltRecSerializer.write((BaselineTopologyRecord)rec, buf);
-
-                break;
-
             default:
                 delegateSerializer.writeRecord(rec, buf);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff1a7e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
index dd0819c..caa0962 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
@@ -47,6 +47,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.SWITCH_SEGMENT_RECORD;
 
 /**
  * Record V1 serializer.
@@ -111,7 +112,13 @@ public class RecordV1Serializer implements RecordSerializer {
 
         /** {@inheritDoc} */
         @Override public int sizeWithHeaders(WALRecord record) throws IgniteCheckedException
{
-            return dataSerializer.size(record) + REC_TYPE_SIZE + FILE_WAL_POINTER_SIZE +
CRC_SIZE;
+            int recordSize = dataSerializer.size(record);
+
+            int recordSizeWithType = recordSize + REC_TYPE_SIZE;
+
+            // Why this condition here, see SWITCH_SEGMENT_RECORD doc.
+            return record.type() != SWITCH_SEGMENT_RECORD ?
+                recordSizeWithType + FILE_WAL_POINTER_SIZE + CRC_SIZE : recordSizeWithType;
         }
 
         /** {@inheritDoc} */
@@ -160,6 +167,10 @@ public class RecordV1Serializer implements RecordSerializer {
             // Write record type.
             putRecordType(buf, rec);
 
+            // SWITCH_SEGMENT_RECORD should have only type, no need to write pointer.
+            if (rec.type() == SWITCH_SEGMENT_RECORD)
+                return;
+
             // Write record file position.
             putPositionOfRecord(buf, rec);
 
@@ -176,8 +187,13 @@ public class RecordV1Serializer implements RecordSerializer {
      * @param skipPositionCheck Skip position check mode.
      * @param recordFilter Record type filter. {@link FilteredRecord} is deserialized instead
of original record
      */
-    public RecordV1Serializer(RecordDataV1Serializer dataSerializer, boolean writePointer,
-        boolean marshalledMode, boolean skipPositionCheck, IgniteBiPredicate<RecordType,
WALPointer> recordFilter) {
+    public RecordV1Serializer(
+        RecordDataV1Serializer dataSerializer,
+        boolean writePointer,
+        boolean marshalledMode,
+        boolean skipPositionCheck,
+        IgniteBiPredicate<RecordType, WALPointer> recordFilter
+    ) {
         this.dataSerializer = dataSerializer;
         this.writePointer = writePointer;
         this.recordFilter = recordFilter;
@@ -376,10 +392,16 @@ public class RecordV1Serializer implements RecordSerializer {
     static void writeWithCrc(WALRecord rec, ByteBuffer buf, RecordIO writer) throws IgniteCheckedException
{
         assert rec.size() >= 0 && buf.remaining() >= rec.size() : rec.size();
 
+        boolean switchSegmentRec = rec.type() == RecordType.SWITCH_SEGMENT_RECORD;
+
         int startPos = buf.position();
 
         writer.writeWithHeaders(rec, buf);
 
+        // No need calculate and write CRC for SWITCH_SEGMENT_RECORD.
+        if (switchSegmentRec)
+            return;
+
         if (!skipCrc) {
             int curPos = buf.position();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff1a7e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
index feeb810..2b81210 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.lang.IgniteBiPredicate;
 
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.SWITCH_SEGMENT_RECORD;
 import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.CRC_SIZE;
 import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.REC_TYPE_SIZE;
 
@@ -93,7 +94,13 @@ public class RecordV2Serializer implements RecordSerializer {
 
         /** {@inheritDoc} */
         @Override public int sizeWithHeaders(WALRecord record) throws IgniteCheckedException
{
-            return dataSerializer.size(record) + REC_TYPE_SIZE + FILE_WAL_POINTER_SIZE +
CRC_SIZE;
+            int recordSize = dataSerializer.size(record);
+
+            int recordSizeWithType = recordSize + REC_TYPE_SIZE;
+
+            // Why this condition here, see SWITCH_SEGMENT_RECORD doc.
+            return record.type() != SWITCH_SEGMENT_RECORD ?
+                recordSizeWithType + FILE_WAL_POINTER_SIZE + CRC_SIZE : recordSizeWithType;
         }
 
         /** {@inheritDoc} */
@@ -103,7 +110,7 @@ public class RecordV2Serializer implements RecordSerializer {
         ) throws IOException, IgniteCheckedException {
             WALRecord.RecordType recType = RecordV1Serializer.readRecordType(in);
 
-            if (recType == WALRecord.RecordType.SWITCH_SEGMENT_RECORD)
+            if (recType == SWITCH_SEGMENT_RECORD)
                 throw new SegmentEofException("Reached end of segment", null);
 
             FileWALPointer ptr = readPositionAndCheckPoint(in, expPtr, skipPositionCheck);
@@ -162,6 +169,10 @@ public class RecordV2Serializer implements RecordSerializer {
             // Write record type.
             RecordV1Serializer.putRecordType(buf, record);
 
+            // SWITCH_SEGMENT_RECORD should have only type, no need to write pointer.
+            if (record.type() == SWITCH_SEGMENT_RECORD)
+                return;
+
             // Write record file position.
             putPositionOfRecord(buf, record);
 
@@ -172,13 +183,19 @@ public class RecordV2Serializer implements RecordSerializer {
 
     /**
      * Create an instance of Record V2 serializer.
+     *
      * @param dataSerializer V2 data serializer.
      * @param marshalledMode Marshalled mode.
      * @param skipPositionCheck Skip position check mode.
-     * @param recordFilter Record type filter. {@link FilteredRecord} is deserialized instead
of original record
+     * @param recordFilter Record type filter. {@link FilteredRecord} is deserialized instead
of original record.
      */
-    public RecordV2Serializer(RecordDataV2Serializer dataSerializer, boolean writePointer,
-        boolean marshalledMode, boolean skipPositionCheck, IgniteBiPredicate<RecordType,
WALPointer> recordFilter) {
+    public RecordV2Serializer(
+        RecordDataV2Serializer dataSerializer,
+        boolean writePointer,
+        boolean marshalledMode,
+        boolean skipPositionCheck,
+        IgniteBiPredicate<RecordType, WALPointer> recordFilter
+    ) {
         this.dataSerializer = dataSerializer;
         this.writePointer = writePointer;
         this.marshalledMode = marshalledMode;

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff1a7e2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java
new file mode 100644
index 0000000..b30466e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db.wal;
+
+import java.io.File;
+import java.nio.file.Paths;
+import java.util.Random;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.MetastoreDataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.SwitchSegmentRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.GridCacheIoManager;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FsyncModeFileWriteAheadLogManager;
+import org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
+import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
+import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessorImpl;
+import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.spi.eventstorage.NoopEventStorageSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assert;
+
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.METASTORE_DATA_RECORD;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE;
+
+/***
+ * Test check correct switch segment if in the tail of segment have garbage.
+ */
+public class IgniteWalIteratorSwitchSegmentTest extends GridCommonAbstractTest {
+    /** Segment file size. */
+    private static final int SEGMENT_SIZE = 1024 * 1024;
+
+    /** WAL segment file sub directory. */
+    private static final String WORK_SUB_DIR = "/NODE/wal";
+
+    /** WAL archive segment file sub directory. */
+    private static final String ARCHIVE_SUB_DIR = "/NODE/walArchive";
+
+    /** Serializer versions for check. */
+    private int[] checkSerializerVers = new int[] {
+        1,
+        2
+    };
+
+    /** FileWriteAheadLogManagers for check. */
+    private Class[] checkWalManagers = new Class[] {
+        FileWriteAheadLogManager.class,
+        FsyncModeFileWriteAheadLogManager.class
+    };
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        U.delete(Paths.get(U.defaultWorkDirectory()));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        U.delete(Paths.get(U.defaultWorkDirectory()));
+    }
+
+    /**
+     * Test for check invariant, size of SWITCH_SEGMENT_RECORD should be 1 byte.
+     *
+     * @throws Exception If some thing failed.
+     */
+    public void testCheckSerializer() throws Exception {
+        for (int serVer : checkSerializerVers) {
+            checkInvariantSwitchSegmentSize(serVer);
+        }
+    }
+
+    /**
+     * @param serVer WAL serializer version.
+     * @throws Exception If some thing failed.
+     */
+    private void checkInvariantSwitchSegmentSize(int serVer) throws Exception {
+        GridKernalContext kctx = new StandaloneGridKernalContext(
+            log, null, null) {
+            @Override public IgniteCacheObjectProcessor cacheObjects() {
+                return new IgniteCacheObjectProcessorImpl(this);
+            }
+        };
+
+        RecordSerializer serializer = new RecordSerializerFactoryImpl(
+            new GridCacheSharedContext<>(
+                kctx,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                new IgniteCacheDatabaseSharedManager() {
+                    @Override public int pageSize() {
+                        return DataStorageConfiguration.DFLT_PAGE_SIZE;
+                    }
+                },
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+
+                null)
+        ).createSerializer(serVer);
+
+        SwitchSegmentRecord switchSegmentRecord = new SwitchSegmentRecord();
+
+        int recordSize = serializer.size(switchSegmentRecord);
+
+        Assert.assertEquals(1, recordSize);
+    }
+
+    /**
+     * Test for check invariant, size of SWITCH_SEGMENT_RECORD should be 1 byte.
+     *
+     * @throws Exception If some thing failed.
+     */
+    public void test() throws Exception {
+        for (int serVer : checkSerializerVers) {
+            for (Class walMgrClass : checkWalManagers) {
+                try {
+                    log.info("checking wal manager " + walMgrClass + " with serializer version
" + serVer);
+
+                    checkInvariantSwitchSegment(walMgrClass, serVer);
+                }
+                finally {
+                    U.delete(Paths.get(U.defaultWorkDirectory()));
+                }
+            }
+        }
+    }
+
+    /**
+     * @param walMgrClass WAL manager class.
+     * @param serVer WAL serializer version.
+     * @throws Exception If some thing failed.
+     */
+    private void checkInvariantSwitchSegment(Class walMgrClass, int serVer) throws Exception
{
+        String workDir = U.defaultWorkDirectory();
+
+        T2<IgniteWriteAheadLogManager, RecordSerializer> initTup = initiate(walMgrClass,
serVer, workDir);
+
+        IgniteWriteAheadLogManager walMgr = initTup.get1();
+
+        RecordSerializer recordSerializer = initTup.get2();
+
+        int switchSegmentRecordSize = recordSerializer.size(new SwitchSegmentRecord());
+
+        log.info("switchSegmentRecordSize:" + switchSegmentRecordSize);
+
+        int tailSize = 0;
+
+        /* Initial record payload size. */
+        int payloadSize = 1024;
+
+        int recSize = 0;
+
+        MetastoreDataRecord rec = null;
+
+        /* Record size. */
+        int recordTypeSize = 1;
+
+        /* Record pointer. */
+        int recordPointerSize = 8 + 4 + 4;
+
+        int lowBound = recordTypeSize + recordPointerSize;
+        int highBound = lowBound + /*CRC*/4;
+
+        int attempt = 1000;
+
+        // Try find how many record need for specific tail size.
+        while (true) {
+            if (attempt < 0)
+                throw new IgniteCheckedException("Can not find any payload size for test,
" +
+                    "lowBound=" + lowBound + ", highBound=" + highBound);
+
+            if (tailSize >= lowBound && tailSize < highBound)
+                break;
+
+            payloadSize++;
+
+            byte[] payload = new byte[payloadSize];
+
+            // Fake record for payload.
+            rec = new MetastoreDataRecord("0", payload);
+
+            recSize = recordSerializer.size(rec);
+
+            tailSize = (SEGMENT_SIZE - HEADER_RECORD_SIZE) % recSize;
+
+            attempt--;
+        }
+
+        Assert.assertNotNull(rec);
+
+        int recordsToWrite = SEGMENT_SIZE / recSize;
+
+        log.info("records to write " + recordsToWrite + " tail size " +
+            (SEGMENT_SIZE - HEADER_RECORD_SIZE) % recSize);
+
+        // Add more record for rollover to the next segment.
+        recordsToWrite += 100;
+
+        for (int i = 0; i < recordsToWrite; i++) {
+            walMgr.log(new MetastoreDataRecord(rec.key(), rec.value()));
+        }
+
+        walMgr.flush(null, true);
+
+        // Await archiver move segment to WAL archive.
+        Thread.sleep(5000);
+
+        // If switchSegmentRecordSize more that 1, it mean that invariant is broke.
+        // Filling tail some garbage. Simulate tail garbage on rotate segment in WAL work
directory.
+        if (switchSegmentRecordSize > 1) {
+            File seg = new File(workDir + ARCHIVE_SUB_DIR + "/0000000000000000.wal");
+
+            FileIOFactory ioFactory = new RandomAccessFileIOFactory();
+
+            FileIO seg0 = ioFactory.create(seg);
+
+            byte[] bytes = new byte[tailSize];
+
+            Random rnd = new Random();
+
+            rnd.nextBytes(bytes);
+
+            // Some record type.
+            bytes[0] = (byte)(METASTORE_DATA_RECORD.ordinal() + 1);
+
+            seg0.position((int)(seg0.size() - tailSize));
+
+            seg0.write(bytes, 0, tailSize);
+
+            seg0.force(true);
+
+            seg0.close();
+        }
+
+        int expectedRecords = recordsToWrite;
+        int actualRecords = 0;
+
+        // Check that switch segment works as expected and all record is reachable.
+        try (WALIterator it = walMgr.replay(null)) {
+            while (it.hasNext()) {
+                IgniteBiTuple<WALPointer, WALRecord> tup = it.next();
+
+                WALRecord rec0 = tup.get2();
+
+                if (rec0.type() == METASTORE_DATA_RECORD)
+                    actualRecords++;
+            }
+        }
+
+        Assert.assertEquals("Not all records read during iteration.", expectedRecords, actualRecords);
+    }
+
+    /***
+     * Initiate WAL manager.
+     *
+     * @param walMgrClass WAL manager class.
+     * @param serVer WAL serializer version.
+     * @param workDir Work directory path.
+     * @return Tuple of WAL manager and WAL record serializer.
+     * @throws IgniteCheckedException If some think failed.
+     */
+    private T2<IgniteWriteAheadLogManager, RecordSerializer> initiate(
+        Class walMgrClass,
+        int serVer,
+        String workDir
+    ) throws IgniteCheckedException {
+
+        GridKernalContext kctx = new StandaloneGridKernalContext(
+            log, null, null
+        ) {
+            @Override protected IgniteConfiguration prepareIgniteConfiguration() {
+                IgniteConfiguration cfg = super.prepareIgniteConfiguration();
+
+                cfg.setDataStorageConfiguration(
+                    new DataStorageConfiguration()
+                        .setWalSegmentSize(SEGMENT_SIZE)
+                        .setWalMode(WALMode.FSYNC)
+                        .setWalPath(workDir + WORK_SUB_DIR)
+                        .setWalArchivePath(workDir + ARCHIVE_SUB_DIR)
+                );
+
+                cfg.setEventStorageSpi(new NoopEventStorageSpi());
+
+                return cfg;
+            }
+
+            @Override public GridInternalSubscriptionProcessor internalSubscriptionProcessor()
{
+                return new GridInternalSubscriptionProcessor(this);
+            }
+
+            @Override public GridEventStorageManager event() {
+                return new GridEventStorageManager(this);
+            }
+        };
+
+        IgniteWriteAheadLogManager walMgr = null;
+
+        if (walMgrClass.equals(FileWriteAheadLogManager.class)) {
+            walMgr = new FileWriteAheadLogManager(kctx);
+
+            GridTestUtils.setFieldValue(walMgr, "serializerVer", serVer);
+        }
+        else if (walMgrClass.equals(FsyncModeFileWriteAheadLogManager.class)) {
+            walMgr = new FsyncModeFileWriteAheadLogManager(kctx);
+
+            GridTestUtils.setFieldValue(walMgr, "serializerVersion", serVer);
+        }
+
+        GridCacheSharedContext<?, ?> ctx = new GridCacheSharedContext<>(
+            kctx,
+            null,
+            null,
+            null,
+            null,
+            walMgr,
+            null,
+            new GridCacheDatabaseSharedManager(kctx),
+            null,
+            null,
+            null,
+            null,
+            new GridCacheIoManager(),
+            null,
+            null,
+            null
+        );
+
+        walMgr.start(ctx);
+
+        walMgr.onActivate(kctx);
+
+        walMgr.resumeLogging(null);
+
+        RecordSerializer recordSerializer = new RecordSerializerFactoryImpl(ctx)
+            .createSerializer(walMgr.serializerVersion());
+
+        return new T2<>(walMgr, recordSerializer);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ff1a7e2/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index c85965f..50625f3 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalF
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalFlushFailoverTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalFlushLogOnlySelfTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalHistoryReservationsTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalIteratorSwitchSegmentTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalSerializerVersionTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.crc.IgniteDataIntegrityTests;
@@ -146,5 +147,7 @@ public class IgnitePdsTestSuite2 extends TestSuite {
         suite.addTestSuite(IgnitePdsCorruptedStoreTest.class);
 
         suite.addTestSuite(LocalWalModeChangeDuringRebalancingSelfTest.class);
+
+        suite.addTestSuite(IgniteWalIteratorSwitchSegmentTest.class);
     }
 }


Mime
View raw message