ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [2/2] ignite git commit: IGNITE-5323 - Moved record serializer version from file name to file header
Date Mon, 05 Jun 2017 08:41:56 GMT
IGNITE-5323 - Moved record serializer version from file name to file header


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b47db106
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b47db106
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b47db106

Branch: refs/heads/ignite-5267
Commit: b47db106d42afb5dcbee57b793b63efa43fc4ef2
Parents: e1c328e
Author: Alexey Goncharuk <alexey.goncharuk@gmail.com>
Authored: Mon Jun 5 11:41:42 2017 +0300
Committer: Alexey Goncharuk <alexey.goncharuk@gmail.com>
Committed: Mon Jun 5 11:41:42 2017 +0300

----------------------------------------------------------------------
 .../database/wal/FileWriteAheadLogManager.java  | 105 ++++++++++++-------
 .../wal/serializer/RecordV1Serializer.java      |  13 ++-
 2 files changed, 79 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b47db106/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java
b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java
index 4b79308..8a113ba 100644
--- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java
+++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java
@@ -79,10 +79,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter
impl
     private static final byte[] FILL_BUF = new byte[1024 * 1024];
 
     /** */
-    private static final Pattern WAL_NAME_PATTERN = Pattern.compile("\\d{16}\\.v\\d+\\.wal");
+    private static final Pattern WAL_NAME_PATTERN = Pattern.compile("\\d{16}\\.wal");
 
     /** */
-    private static final Pattern WAL_TEMP_NAME_PATTERN = Pattern.compile("\\d{16}\\.v\\d+\\.wal\\.tmp");
+    private static final Pattern WAL_TEMP_NAME_PATTERN = Pattern.compile("\\d{16}\\.wal\\.tmp");
 
     /** */
     private static final FileFilter WAL_SEGMENT_FILE_FILTER = new FileFilter() {
@@ -322,6 +322,15 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter
impl
 
             currentHnd = restoreWriteHandle(filePtr);
 
+            if (currentHnd.serializer.version() != serializer.version()) {
+                if (log.isInfoEnabled())
+                    log.info("Record serializer version change detected, will start logging
with a new WAL record " +
+                        "serializer to a new WAL segment [curFile=" + currentHnd + ", newVer="
+ serializer.version() +
+                        ", oldVer=" + currentHnd.serializer.version() + ']');
+
+                rollOver(currentHnd);
+            }
+
             if (mode == Mode.BACKGROUND) {
                 flusher = new QueueFlusher(cctx.igniteInstanceName());
 
@@ -443,7 +452,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter
impl
     }
 
     private boolean hasIndex(long absIdx) {
-        String name = FileDescriptor.fileName(absIdx, serializer.version());
+        String name = FileDescriptor.fileName(absIdx);
 
         boolean inArchive = new File(walArchiveDir, name).exists();
 
@@ -569,29 +578,35 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter
impl
     private FileWriteHandle restoreWriteHandle(FileWALPointer lastReadPtr) throws IgniteCheckedException
{
         long absIdx = lastReadPtr == null ? 0 : lastReadPtr.index();
 
-        archiver.currentWalIndex(absIdx);
-
         long segNo = absIdx % dbCfg.getWalSegments();
 
-        File curFile = new File(walWorkDir, FileDescriptor.fileName(segNo, serializer.version()));
+        File curFile = new File(walWorkDir, FileDescriptor.fileName(segNo));
 
         int offset = lastReadPtr == null ? 0 : lastReadPtr.fileOffset();
         int len = lastReadPtr == null ? 0 : lastReadPtr.length();
 
-        log.info("Resuming logging in WAL segment [file=" + curFile.getAbsolutePath() +
-            ", offset=" + offset + ']');
-
         try {
             RandomAccessFile file = new RandomAccessFile(curFile, "rw");
 
             try {
+                // readSerializerVersion will change the channel position.
+                // This is fine because the FileWriteHandle consitructor will move it
+                // to offset + len anyways.
+                int serVer = readSerializerVersion(file, curFile, absIdx);
+
+                RecordSerializer ser = forVersion(cctx, serVer);
+
+                if (log.isInfoEnabled())
+                    log.info("Resuming logging to WAL segment [file=" + curFile.getAbsolutePath()
+
+                        ", offset=" + offset + ", ver=" + serVer + ']');
+
                 FileWriteHandle hnd = new FileWriteHandle(
                     file,
                     absIdx,
                     cctx.igniteInstanceName(),
                     offset + len,
                     maxWalSegmentSize,
-                    serializer);
+                    ser);
 
                 if (lastReadPtr == null) {
                     HeaderRecord header = new HeaderRecord(serializer.version());
@@ -601,6 +616,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter
impl
                     hnd.addRecord(header);
                 }
 
+                archiver.currentWalIndex(absIdx);
+
                 return hnd;
             }
             catch (IgniteCheckedException | IOException e) {
@@ -681,7 +698,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter
impl
 
         // Allocate the first segment synchronously. All other segments will be allocated
by archiver in background.
         if (allFiles.length == 0) {
-            File first = new File(walWorkDir, FileDescriptor.fileName(0, serializer.version()));
+            File first = new File(walWorkDir, FileDescriptor.fileName(0));
 
             createFile(first);
         }
@@ -761,7 +778,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter
impl
 
         long segmentIdx = absNextIdx % dbCfg.getWalSegments();
 
-        return new File(walWorkDir, FileDescriptor.fileName(segmentIdx, serializer.version()));
+        return new File(walWorkDir, FileDescriptor.fileName(segmentIdx));
     }
 
     /**
@@ -1072,9 +1089,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter
impl
         private File archiveSegment(long absIdx) throws IgniteCheckedException {
             long segIdx = absIdx % dbCfg.getWalSegments();
 
-            File origFile = new File(walWorkDir, FileDescriptor.fileName(segIdx, serializer.version()));
+            File origFile = new File(walWorkDir, FileDescriptor.fileName(segIdx));
 
-            String name = FileDescriptor.fileName(absIdx, serializer.version());
+            String name = FileDescriptor.fileName(absIdx);
 
             File dstTmpFile = new File(walArchiveDir, name + ".tmp");
 
@@ -1163,7 +1180,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter
impl
      */
     private void checkFiles(int startWith, boolean create, IgnitePredicate<Integer>
p) throws IgniteCheckedException {
         for (int i = startWith; i < dbCfg.getWalSegments() && (p == null || (p
!= null && p.apply(i))); i++) {
-            File checkFile = new File(walWorkDir, FileDescriptor.fileName(i, serializer.version()));
+            File checkFile = new File(walWorkDir, FileDescriptor.fileName(i));
 
             if (checkFile.exists()) {
                 if (checkFile.isDirectory())
@@ -1179,6 +1196,35 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter
impl
     }
 
     /**
+     * @param rf Random access file.
+     * @param file File object.
+     * @param idx File index to read.
+     * @return Serializer version stored in the file.
+     * @throws IOException If failed to read serializer version.
+     * @throws IgniteCheckedException If failed to read serializer version.
+     */
+    private int readSerializerVersion(RandomAccessFile rf, File file, long idx)
+        throws IOException, IgniteCheckedException {
+        try {
+            ByteBuffer buf = ByteBuffer.allocate(RecordV1Serializer.HEADER_RECORD_SIZE);
+            buf.order(ByteOrder.nativeOrder());
+
+            FileInput in = new FileInput(rf.getChannel(), buf);
+
+            // Header record must be agnostic to the serializer version.
+            WALRecord rec = serializer.readRecord(in, new FileWALPointer(idx, 0, 0));
+
+            if (rec.type() != WALRecord.RecordType.HEADER_RECORD)
+                throw new IOException("Missing file header record: " + file.getAbsoluteFile());
+
+            return ((HeaderRecord)rec).version();
+        }
+        catch (SegmentEofException | EOFException ignore) {
+            return serializer.version();
+        }
+    }
+
+    /**
      * WAL file descriptor.
      */
     private static class FileDescriptor implements Comparable<FileDescriptor> {
@@ -1188,9 +1234,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter
impl
         /** Absolute WAL segment file index */
         protected final long idx;
 
-        /** */
-        protected final int ver;
-
         /**
          * @param file File.
          */
@@ -1209,27 +1252,19 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter
impl
 
             assert fileName.endsWith(WAL_SEGMENT_FILE_EXT);
 
-            int v = fileName.lastIndexOf(".v");
-
-            assert v > 0;
-
-            int begin = v + 2;
             int end = fileName.length() - WAL_SEGMENT_FILE_EXT.length();
 
             if (idx == null)
-                this.idx = Long.parseLong(fileName.substring(0, v));
+                this.idx = Long.parseLong(fileName.substring(0, end));
             else
                 this.idx = idx;
-
-            ver = Integer.parseInt(fileName.substring(begin, end));
         }
 
         /**
          * @param segment Segment index.
-         * @param ver Serializer version.
          * @return Segment file name.
          */
-        private static String fileName(long segment, int ver) {
+        private static String fileName(long segment) {
             SB b = new SB();
 
             String segmentStr = Long.toString(segment);
@@ -1237,7 +1272,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter
impl
             for (int i = segmentStr.length(); i < 16; i++)
                 b.a('0');
 
-            b.a(segmentStr).a(".v").a(ver).a(WAL_SEGMENT_FILE_EXT);
+            b.a(segmentStr).a(WAL_SEGMENT_FILE_EXT);
 
             return b.toString();
         }
@@ -2232,13 +2267,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter
impl
 
             if (readArchive) {
                 fd = new FileDescriptor(new File(walArchiveDir,
-                    FileDescriptor.fileName(curIdx, serializer.version())));
+                    FileDescriptor.fileName(curIdx)));
             }
             else {
                 long workIdx = curIdx % dbCfg.getWalSegments();
 
                 fd = new FileDescriptor(
-                    new File(walWorkDir, FileDescriptor.fileName(workIdx, serializer.version())),
+                    new File(walWorkDir, FileDescriptor.fileName(workIdx)),
                     curIdx);
             }
 
@@ -2278,11 +2313,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter
impl
                 RandomAccessFile rf = new RandomAccessFile(desc.file, "r");
 
                 try {
-                    RecordSerializer ser = forVersion(cctx, desc.ver);
                     FileChannel channel = rf.getChannel();
                     FileInput in = new FileInput(channel, buf);
 
-                    WALRecord rec = ser.readRecord(in,
+                    // Header record must be agnostic to the serializer version.
+                    WALRecord rec = serializer.readRecord(in,
                         new FileWALPointer(desc.idx, (int)channel.position(), 0));
 
                     if (rec == null)
@@ -2293,9 +2328,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter
impl
 
                     int ver = ((HeaderRecord)rec).version();
 
-                    if (ver != ser.version())
-                        throw new IOException("Unexpected file format version: " + ver +
", " +
-                            desc.file.getAbsoluteFile());
+                    RecordSerializer ser = forVersion(cctx, ver);
 
                     if (start != null && desc.idx == start.index())
                         in.seek(start.fileOffset());

http://git-wip-us.apache.org/repos/asf/ignite/blob/b47db106/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java
----------------------------------------------------------------------
diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java
b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java
index 1c56338..f39cdfd 100644
--- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java
+++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java
@@ -97,6 +97,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC;
 
@@ -105,6 +106,9 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC;
  */
 public class RecordV1Serializer implements RecordSerializer {
     /** */
+    public static final int HEADER_RECORD_SIZE = /*Type*/1 + /*Pointer */12 + /*Magic*/8
+ /*Version*/4 + /*CRC*/4;
+
+    /** */
     private GridCacheSharedContext cctx;
 
     /** */
@@ -779,8 +783,11 @@ public class RecordV1Serializer implements RecordSerializer {
                 break;
 
             case HEADER_RECORD:
-                if (in.readLong() != HeaderRecord.MAGIC)
-                    throw new EOFException("Magic is corrupted.");
+                long magic = in.readLong();
+
+                if (magic != HeaderRecord.MAGIC)
+                    throw new EOFException("Magic is corrupted [exp=" + U.hexLong(HeaderRecord.MAGIC)
+
+                        ", actual=" + U.hexLong(magic) + ']');
 
                 int ver = in.readInt();
 
@@ -1246,7 +1253,7 @@ public class RecordV1Serializer implements RecordSerializer {
                 return commonFields + 4 + dataSize(dataRec);
 
             case HEADER_RECORD:
-                return commonFields + 12;
+                return HEADER_RECORD_SIZE;
 
             case DATA_PAGE_INSERT_RECORD:
                 DataPageInsertRecord diRec = (DataPageInsertRecord)record;


Mime
View raw message