ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [03/25] ignite git commit: IGNITE-6547 Support logging timestamp for WAL tx and data records - Fixes #2792.
Date Mon, 09 Oct 2017 10:42:19 GMT
IGNITE-6547 Support logging timestamp for WAL tx and data records - Fixes #2792.

Signed-off-by: Alexey Goncharuk <alexey.goncharuk@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e6e1ca9a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e6e1ca9a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e6e1ca9a

Branch: refs/heads/ignite-3478
Commit: e6e1ca9a5a9155a550258b112415b65845d6bcef
Parents: 78f77b1
Author: Dmitriy Govorukhin <dmitriy.govorukhin@gmail.com>
Authored: Wed Oct 4 18:54:49 2017 +0300
Committer: Alexey Goncharuk <alexey.goncharuk@gmail.com>
Committed: Wed Oct 4 18:54:49 2017 +0300

----------------------------------------------------------------------
 .../internal/pagemem/wal/record/DataRecord.java |  20 +-
 .../pagemem/wal/record/TimeStampRecord.java     |  57 ++++++
 .../internal/pagemem/wal/record/TxRecord.java   |  52 +++--
 .../reader/StandaloneWalRecordsIterator.java    |   2 +-
 .../wal/serializer/RecordDataV1Serializer.java  |   6 +-
 .../wal/serializer/RecordDataV2Serializer.java  |  49 ++++-
 .../wal/serializer/TxRecordSerializer.java      |   3 +-
 .../cache/transactions/IgniteTxAdapter.java     |   3 +-
 .../db/wal/IgniteWalSerializerVersionTest.java  | 205 ++++++++++++++++++-
 9 files changed, 365 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e6e1ca9a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
index 0e92383..ac569bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
@@ -27,7 +27,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
  * This record contains information about operation we want to do.
  * Contains operation type (put, remove) and (Key, Value, Version) for each {@link DataEntry}
  */
-public class DataRecord extends WALRecord {
+public class DataRecord extends TimeStampRecord {
     /** */
     @GridToStringInclude
     private List<DataEntry> writeEntries;
@@ -59,6 +59,24 @@ public class DataRecord extends WALRecord {
     }
 
     /**
+     * @param writeEntry Write entry.
+     * @param timestamp TimeStamp.
+     */
+    public DataRecord(DataEntry writeEntry, long timestamp) {
+        this(Collections.singletonList(writeEntry), timestamp);
+    }
+
+    /**
+     * @param writeEntries Write entries.
+     * @param timestamp TimeStamp.
+     */
+    public DataRecord(List<DataEntry> writeEntries, long timestamp) {
+        super(timestamp);
+
+        this.writeEntries = writeEntries;
+    }
+
+    /**
      * @return Collection of write entries.
      */
     public List<DataEntry> writeEntries() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/e6e1ca9a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TimeStampRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TimeStampRecord.java
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TimeStampRecord.java
new file mode 100644
index 0000000..3f29dfd
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TimeStampRecord.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record;
+
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Base class for records with timeStamp.
+ * All records which support timeStamp should be inherited from {@code TimeStampRecord}.
+ */
+public abstract class TimeStampRecord extends WALRecord {
+    /** Timestamp. */
+    protected long timestamp;
+
+    /**
+     *
+     */
+    protected TimeStampRecord() {
+        timestamp = U.currentTimeMillis();
+    }
+
+    /**
+     * @param timestamp TimeStamp.
+     */
+    protected TimeStampRecord(long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    /**
+     * @param timestamp TimeStamp.
+     */
+    public void timestamp(long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    /**
+     * @return TimeStamp.
+     */
+    public long timestamp() {
+        return timestamp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e6e1ca9a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java
index ce1e28e..f933fa9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java
@@ -28,7 +28,7 @@ import org.jetbrains.annotations.Nullable;
  * Logical data record indented for transaction (tx) related actions.<br>
  * This record is marker of begin, prepare, commit, and rollback transactions.
  */
-public class TxRecord extends WALRecord {
+public class TxRecord extends TimeStampRecord {
     /** Transaction state. */
     private TransactionState state;
 
@@ -49,28 +49,51 @@ public class TxRecord extends WALRecord {
     /** If transaction is remote, primary node for this backup node. */
     @Nullable private Object primaryNode;
 
-    /** Timestamp of Tx state change. */
-    private long timestamp;
-
     /**
      *
      * @param state Transaction state.
      * @param nearXidVer Transaction id.
      * @param writeVer Transaction entries write topology version.
      * @param participatingNodes Primary -> Backup nodes participating in transaction.
+     * @param primaryNode Primary node.
      */
-    public TxRecord(TransactionState state,
-                    GridCacheVersion nearXidVer,
-                    GridCacheVersion writeVer,
-                    @Nullable Map<Object, Collection<Object>> participatingNodes,
-                    @Nullable Object primaryNode,
-                    long timestamp) {
+    public TxRecord(
+        TransactionState state,
+        GridCacheVersion nearXidVer,
+        GridCacheVersion writeVer,
+        @Nullable Map<Object, Collection<Object>> participatingNodes,
+        @Nullable Object primaryNode
+    ) {
+        this.state = state;
+        this.nearXidVer = nearXidVer;
+        this.writeVer = writeVer;
+        this.participatingNodes = participatingNodes;
+        this.primaryNode = primaryNode;
+    }
+
+    /**
+     * @param state Transaction state.
+     * @param nearXidVer Transaction id.
+     * @param writeVer Transaction entries write topology version.
+     * @param participatingNodes Primary -> Backup nodes participating in transaction.
+     * @param primaryNode Primary node.
+     * @param timestamp TimeStamp.
+     */
+    public TxRecord(
+        TransactionState state,
+        GridCacheVersion nearXidVer,
+        GridCacheVersion writeVer,
+        @Nullable Map<Object, Collection<Object>> participatingNodes,
+        @Nullable Object primaryNode,
+        long timestamp
+    ) {
+        super(timestamp);
+
         this.state = state;
         this.nearXidVer = nearXidVer;
         this.writeVer = writeVer;
         this.participatingNodes = participatingNodes;
         this.primaryNode = primaryNode;
-        this.timestamp = timestamp;
     }
 
     /** {@inheritDoc} */
@@ -148,13 +171,6 @@ public class TxRecord extends WALRecord {
         return primaryNode;
     }
 
-    /**
-     * @return Timestamp of Tx state change in millis.
-     */
-    public long timestamp() {
-        return timestamp;
-    }
-
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(TxRecord.class, this, "super", super.toString());

http://git-wip-us.apache.org/repos/asf/ignite/blob/e6e1ca9a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
index f1258a0..24b2148 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
@@ -305,7 +305,7 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator
{
 
             postProcessedEntries.add(postProcessedEntry);
         }
-        return new DataRecord(postProcessedEntries);
+        return new DataRecord(postProcessedEntries, dataRec.timestamp());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/e6e1ca9a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
index 8b5e6ba..e583df3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
@@ -384,7 +384,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 for (int i = 0; i < entryCnt; i++)
                     entries.add(readDataEntry(in));
 
-                res = new DataRecord(entries);
+                res = new DataRecord(entries, 0L);
 
                 break;
 
@@ -1322,7 +1322,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer
{
      * @param buf Buffer to write to.
      * @param entry Data entry.
      */
-    private static void putDataEntry(ByteBuffer buf, DataEntry entry) throws IgniteCheckedException
{
+    static void putDataEntry(ByteBuffer buf, DataEntry entry) throws IgniteCheckedException
{
         buf.putInt(entry.cacheId());
 
         if (!entry.key().putValue(buf))
@@ -1390,7 +1390,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer
{
      * @param in Input to read from.
      * @return Read entry.
      */
-    private DataEntry readDataEntry(ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException
{
+    DataEntry readDataEntry(ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException
{
         int cacheId = in.readInt();
 
         int keySize = in.readInt();

http://git-wip-us.apache.org/repos/asf/ignite/blob/e6e1ca9a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
index 2b55c5f..c02f36e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
@@ -19,7 +19,11 @@ package org.apache.ignite.internal.processors.cache.persistence.wal.serializer;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
+import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput;
 import org.apache.ignite.internal.processors.cache.persistence.wal.RecordDataSerializer;
@@ -46,12 +50,35 @@ public class RecordDataV2Serializer implements RecordDataSerializer {
         if (record instanceof HeaderRecord)
             throw new UnsupportedOperationException("Getting size of header records is forbidden
since version 2 of serializer");
 
-        return delegateSerializer.size(record);
+        switch (record.type()) {
+            case DATA_RECORD:
+                return delegateSerializer.size(record) + 8/*timestamp*/;
+
+            default:
+                return delegateSerializer.size(record);
+        }
     }
 
     /** {@inheritDoc} */
-    @Override public WALRecord readRecord(WALRecord.RecordType type, ByteBufferBackedDataInput
in) throws IOException, IgniteCheckedException {
-        return delegateSerializer.readRecord(type, in);
+    @Override public WALRecord readRecord(
+        WALRecord.RecordType type,
+        ByteBufferBackedDataInput in
+    ) throws IOException, IgniteCheckedException {
+        switch (type) {
+            case DATA_RECORD:
+                int entryCnt = in.readInt();
+                long timeStamp = in.readLong();
+
+                List<DataEntry> entries = new ArrayList<>(entryCnt);
+
+                for (int i = 0; i < entryCnt; i++)
+                    entries.add(delegateSerializer.readDataEntry(in));
+
+                return new DataRecord(entries, timeStamp);
+
+            default:
+                return delegateSerializer.readRecord(type, in);
+        }
     }
 
     /** {@inheritDoc} */
@@ -59,6 +86,20 @@ public class RecordDataV2Serializer implements RecordDataSerializer {
         if (record instanceof HeaderRecord)
             throw new UnsupportedOperationException("Writing header records is forbidden
since version 2 of serializer");
 
-        delegateSerializer.writeRecord(record, buf);
+        switch (record.type()) {
+            case DATA_RECORD:
+                DataRecord dataRec = (DataRecord)record;
+
+                buf.putInt(dataRec.writeEntries().size());
+                buf.putLong(dataRec.timestamp());
+
+                for (DataEntry dataEntry : dataRec.writeEntries())
+                    RecordDataV1Serializer.putDataEntry(buf, dataEntry);
+
+                break;
+
+            default:
+                delegateSerializer.writeRecord(record, buf);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e6e1ca9a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/TxRecordSerializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/TxRecordSerializer.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/TxRecordSerializer.java
index 448bdbc..e8b324d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/TxRecordSerializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/TxRecordSerializer.java
@@ -75,9 +75,8 @@ public class TxRecordSerializer {
 
                 buf.putInt(backupNodes.size());
 
-                for (Object backupNode : backupNodes) {
+                for (Object backupNode : backupNodes)
                     writeConsistentId(backupNode, buf);
-                }
             }
         }
         else {

http://git-wip-us.apache.org/repos/asf/ignite/blob/e6e1ca9a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index b5178b5..00c637e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -1116,8 +1116,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
implement
                             nearXidVersion(),
                             writeVersion(),
                             participatingNodes,
-                            remote() ? nodeId() : null,
-                            U.currentTimeMillis()
+                            remote() ? nodeId() : null
                     );
 
                     try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/e6e1ca9a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalSerializerVersionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalSerializerVersionTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalSerializerVersionTest.java
index f31d0f9..ddf74c8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalSerializerVersionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalSerializerVersionTest.java
@@ -17,23 +17,38 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.db.wal;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.PersistentStoreConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
+import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.TimeStampRecord;
+import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.processors.cache.persistence.wal.RecordSerializer;
 import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer;
 import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV2Serializer;
 import org.apache.ignite.internal.util.typedef.internal.GPC;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.TransactionState;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SERIALIZER_VERSION;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR;
 
 /**
  *
@@ -46,7 +61,7 @@ public class IgniteWalSerializerVersionTest extends GridCommonAbstractTest
{
     @Override protected IgniteConfiguration getConfiguration(String name) throws Exception
{
         IgniteConfiguration cfg = super.getConfiguration(name);
 
-        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+        cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
 
         cfg.setPersistentStoreConfiguration(new PersistentStoreConfiguration());
 
@@ -101,4 +116,192 @@ public class IgniteWalSerializerVersionTest extends GridCommonAbstractTest
{
 
         stopGrid();
     }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCheckDifferentSerializerVersionsAndLogTimestamp() throws Exception {
+        IgniteCallable<List<WALRecord>> recordsFactory = new IgniteCallable<List<WALRecord>>()
{
+            @Override public List<WALRecord> call() throws Exception {
+                WALRecord rec0 = new DataRecord(Collections.<DataEntry>emptyList());
+
+                WALRecord rec1 = new TxRecord(
+                    TransactionState.PREPARED,
+                    null,
+                    null,
+                    null,
+                    null
+                );
+
+                return Arrays.asList(rec0, rec1);
+            }
+        };
+
+        long time0 = U.currentTimeMillis();
+
+        check(new Checker(
+            1,
+            RecordV1Serializer.class,
+            recordsFactory,
+            Arrays.asList(0L, time0)
+        ));
+
+        long time1 = U.currentTimeMillis();
+
+        check(new Checker(
+            2,
+            RecordV2Serializer.class,
+            recordsFactory,
+            Arrays.asList(time1, time1)
+        ));
+    }
+
+    /**
+     *
+     */
+    public static class Checker {
+        /** */
+        private final int serializerVer;
+
+        /** */
+        private final Class serializer;
+
+        /** */
+        private final List<Long> timeStamps;
+
+        /** */
+        private final IgniteCallable<List<WALRecord>> recordsToWrite;
+
+        /**
+         *
+         */
+        public Checker(
+            int serializerVer,
+            Class serializer,
+            IgniteCallable<List<WALRecord>> recordsToWrite,
+            List<Long> timeStamps) {
+            this.serializerVer = serializerVer;
+            this.serializer = serializer;
+            this.timeStamps = timeStamps;
+            this.recordsToWrite = recordsToWrite;
+        }
+
+        /**
+         *
+         */
+        public int serializerVersion() {
+            return serializerVer;
+        }
+
+        /**
+         *
+         */
+        public Class serializer() {
+            return serializer;
+        }
+
+        /**
+         *
+         */
+        public List<Long> getTimeStamps() {
+            return timeStamps;
+        }
+
+        /**
+         *
+         */
+        public List<WALRecord> recordsToWrite() throws Exception {
+            return recordsToWrite.call();
+        }
+
+        /**
+         *
+         */
+        public void assertRecords(long exp, WALRecord act) {
+            if (act instanceof TimeStampRecord) {
+                TimeStampRecord act0 = (TimeStampRecord)act;
+
+                if (exp == 0L)
+                    assertTrue(act0.timestamp() == 0L);
+                else{
+                    long diff = Math.abs(exp - act0.timestamp());
+
+                    assertTrue(String.valueOf(diff), diff < 10_000);
+                }
+            }
+            else
+                fail();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void check(Checker checker) throws Exception {
+        System.setProperty(IGNITE_WAL_SERIALIZER_VERSION, Integer.toString(checker.serializerVersion()));
+
+        IgniteEx ig0 = (IgniteEx)startGrid();
+
+        ig0.active(true);
+
+        IgniteWriteAheadLogManager wal = ig0.context().cache().context().wal();
+
+        RecordSerializer ser0 = U.field(wal, "serializer");
+
+        assertTrue(ser0.getClass().getName().equals(checker.serializer().getName()));
+
+        List<WALRecord> recs = checker.recordsToWrite();
+
+        assertTrue(!recs.isEmpty());
+
+        WALPointer p = null;
+
+        for (WALRecord rec : recs) {
+            WALPointer p0 = wal.log(rec);
+
+            if (p == null)
+                p = p0;
+        }
+
+        wal.fsync(null);
+
+        Iterator<Long> itToCheck = checker.getTimeStamps().iterator();
+
+        try (WALIterator it = wal.replay(p)) {
+            while (it.hasNext()) {
+                IgniteBiTuple<WALPointer, WALRecord> tup0 = it.next();
+
+                checker.assertRecords(itToCheck.next(), tup0.get2());
+            }
+        }
+
+        stopGrid();
+
+        System.clearProperty(IGNITE_WAL_SERIALIZER_VERSION);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        stopAllGrids();
+
+        deleteWorkFiles();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+
+        deleteWorkFiles();
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    private void deleteWorkFiles() throws IgniteCheckedException {
+        deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR,
false));
+    }
 }


Mime
View raw message