ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [09/13] ignite git commit: IGNITE-5558 - Added ability to read WAL in standalone mode - Fixes #2174.
Date Wed, 05 Jul 2017 04:23:07 GMT
IGNITE-5558 - Added ability to read WAL in standalone mode - Fixes #2174.

Signed-off-by: Alexey Goncharuk <alexey.goncharuk@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/44f3fac2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/44f3fac2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/44f3fac2

Branch: refs/heads/ignite-gg-12389
Commit: 44f3fac27bec89b5e70e87564c527e48565ddd2a
Parents: ee7566b
Author: dpavlov <dpavlov@gridgain.com>
Authored: Tue Jul 4 20:23:40 2017 +0300
Committer: Alexey Goncharuk <alexey.goncharuk@gmail.com>
Committed: Tue Jul 4 20:23:40 2017 +0300

----------------------------------------------------------------------
 .../PersistentStoreConfiguration.java           |  39 +-
 .../org/apache/ignite/events/EventType.java     |  12 +
 .../ignite/events/WalSegmentArchivedEvent.java  |  62 ++
 .../internal/pagemem/wal/record/WALRecord.java  |  11 +-
 .../IgniteCacheDatabaseSharedManager.java       |  10 +-
 .../wal/AbstractWalRecordsIterator.java         | 289 +++++++++
 .../cache/persistence/wal/FileInput.java        |  16 +-
 .../cache/persistence/wal/FileWALPointer.java   |   4 +-
 .../wal/FileWriteAheadLogManager.java           | 586 +++++++++----------
 .../cache/persistence/wal/RecordSerializer.java |   5 +
 .../persistence/wal/SegmentArchiveResult.java   |  61 ++
 .../persistence/wal/SegmentEofException.java    |   3 +-
 .../wal/reader/IgniteWalIteratorFactory.java    | 102 ++++
 .../wal/reader/StandaloneGridKernalContext.java | 499 ++++++++++++++++
 ...ndaloneIgniteCacheDatabaseSharedManager.java |  30 +
 .../reader/StandaloneWalRecordsIterator.java    | 258 ++++++++
 .../wal/serializer/RecordV1Serializer.java      |  45 +-
 ...IgnitePersistentStoreDataStructuresTest.java |   2 +
 .../wal/IgniteWalHistoryReservationsTest.java   |   2 +-
 .../db/wal/reader/IgniteWalReaderTest.java      | 385 ++++++++++++
 .../db/wal/reader/MockWalIteratorFactory.java   | 114 ++++
 .../ignite/testsuites/IgnitePdsTestSuite2.java  |   9 +-
 22 files changed, 2194 insertions(+), 350 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
index 1d41d41..b531f9d 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
@@ -55,7 +55,7 @@ public class PersistentStoreConfiguration implements Serializable {
     /** */
     public static final int DFLT_WAL_SEGMENTS = 10;
 
-    /** */
+    /** Default WAL file segment size, 64MBytes */
     public static final int DFLT_WAL_SEGMENT_SIZE = 64 * 1024 * 1024;
 
     /** Default wal mode. */
@@ -103,10 +103,10 @@ public class PersistentStoreConfiguration implements Serializable {
     /** Number of work WAL segments. */
     private int walSegments = DFLT_WAL_SEGMENTS;
 
-    /** Number of WAL segments to keep. */
+    /** Size of one WAL segment in bytes. 64 Mb is used by default.  Maximum value is 2Gb */
     private int walSegmentSize = DFLT_WAL_SEGMENT_SIZE;
 
-    /** WAL persistence path. */
+    /** Directory where WAL is stored (work directory) */
     private String walStorePath = DFLT_WAL_STORE_PATH;
 
     /** WAL archive path. */
@@ -121,7 +121,7 @@ public class PersistentStoreConfiguration implements Serializable {
     /** WAl thread local buffer size. */
     private int tlbSize = DFLT_TLB_SIZE;
 
-    /** Wal flush frequency. */
+    /** Wal flush frequency in milliseconds. */
     private int walFlushFreq = DFLT_WAL_FLUSH_FREQ;
 
     /** Wal fsync delay. */
@@ -147,6 +147,11 @@ public class PersistentStoreConfiguration implements Serializable {
     private long rateTimeInterval = DFLT_RATE_TIME_INTERVAL_MILLIS;
 
     /**
+     *  Time interval (in milliseconds) for running auto archiving for incompletely WAL segment
+     */
+    private long walAutoArchiveAfterInactivity = -1;
+
+    /**
      * Returns a path the root directory where the Persistent Store will persist data and indexes.
      */
     public String getPersistentStorePath() {
@@ -297,7 +302,7 @@ public class PersistentStoreConfiguration implements Serializable {
     }
 
     /**
-     * Gets size of a WAL segment.
+     * Gets size of a WAL segment in bytes.
      *
      * @return WAL segment size.
      */
@@ -308,7 +313,7 @@ public class PersistentStoreConfiguration implements Serializable {
     /**
      * Sets size of a WAL segment.
      *
-     * @param walSegmentSize WAL segment size. 64 MB is used by default.
+     * @param walSegmentSize WAL segment size. 64 MB is used by default.  Maximum value is 2Gb
      * @return {@code this} for chaining.
      */
     public PersistentStoreConfiguration setWalSegmentSize(int walSegmentSize) {
@@ -533,6 +538,28 @@ public class PersistentStoreConfiguration implements Serializable {
         return this;
     }
 
+    /**
+     * <b>Note:</b> setting this value with {@link WALMode#DEFAULT} may generate file size overhead for WAL segments in case
+     * grid is used rarely.
+     *
+     * @param walAutoArchiveAfterInactivity time in millis to run auto archiving segment (even if incomplete) after last
+     * record logging. <br> Positive value enables incomplete segment archiving after timeout (inactivity). <br> Zero or
+     * negative  value disables auto archiving.
+     * @return current configuration instance for chaining
+     */
+    public PersistentStoreConfiguration setWalAutoArchiveAfterInactivity(long walAutoArchiveAfterInactivity) {
+        this.walAutoArchiveAfterInactivity = walAutoArchiveAfterInactivity;
+
+        return this;
+    }
+
+    /**
+     * @return time in millis to run auto archiving WAL segment (even if incomplete) after last record log
+     */
+    public long getWalAutoArchiveAfterInactivity() {
+        return walAutoArchiveAfterInactivity;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(PersistentStoreConfiguration.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/events/EventType.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/events/EventType.java b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
index 1960692..47b4089 100644
--- a/modules/core/src/main/java/org/apache/ignite/events/EventType.java
+++ b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
@@ -767,6 +767,18 @@ public interface EventType {
     public static final int EVT_IGFS_FILE_PURGED = 127;
 
     /**
+     * Built-in event type: WAL segment movement to archive folder completed
+     * <p>
+     * Fired for each completed WAL segment which was moved to archive
+     * <p>
+     * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
+     * internal Ignite events and should not be used by user-defined events.
+     *
+     * @see WalSegmentArchivedEvent
+     */
+    public static final int EVT_WAL_SEGMENT_ARCHIVED = 128;
+
+    /**
      * All checkpoint events. This array can be directly passed into
      * {@link IgniteEvents#localListen(IgnitePredicate, int...)} method to
      * subscribe to all checkpoint events.

http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/events/WalSegmentArchivedEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/events/WalSegmentArchivedEvent.java b/modules/core/src/main/java/org/apache/ignite/events/WalSegmentArchivedEvent.java
new file mode 100644
index 0000000..2fc1715
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/events/WalSegmentArchivedEvent.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.events;
+
+import java.io.File;
+import org.apache.ignite.cluster.ClusterNode;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Event indicates there was movement of WAL segment file to archive has been completed
+ */
+public class WalSegmentArchivedEvent extends EventAdapter {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Absolute WAL segment file index. */
+    private long absWalSegmentIdx;
+
+    /** Destination archive file. This file is completed and closed archive segment */
+    private final File archiveFile;
+
+    /**
+     * Creates WAL segment event
+     *
+     * @param node Node.
+     * @param absWalSegmentIdx Absolute wal segment index.
+     * @param archiveFile Archive file.
+     */
+    public WalSegmentArchivedEvent(
+        @NotNull final ClusterNode node,
+        final long absWalSegmentIdx,
+        final File archiveFile) {
+        super(node, "", EventType.EVT_WAL_SEGMENT_ARCHIVED);
+        this.absWalSegmentIdx = absWalSegmentIdx;
+        this.archiveFile = archiveFile;
+    }
+
+    /** @return {@link #archiveFile} */
+    public File getArchiveFile() {
+        return archiveFile;
+    }
+
+    /** @return {@link #absWalSegmentIdx} */
+    public long getAbsWalSegmentIdx() {
+        return absWalSegmentIdx;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
index 678e1fa..89f3c86 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.pagemem.wal.record;
 
+import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.pagemem.wal.WALPointer;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -26,7 +27,8 @@ import org.apache.ignite.internal.util.typedef.internal.S;
  */
 public abstract class WALRecord {
     /**
-     * Record type.
+     * Record type. Ordinal of this record will be written to file. <br>
+     * <b>Note:</b> Do not change order of elements <br>
      */
     public enum RecordType {
         /** */
@@ -171,6 +173,13 @@ public abstract class WALRecord {
         public static RecordType fromOrdinal(int ord) {
             return ord < 0 || ord >= VALS.length ? null : VALS[ord];
         }
+
+        /**
+         * Fake record type, causes stop iterating and indicates segment EOF
+         * <b>Note:</b> regular record type is incremented by 1 and minimal value written to file is also 1
+         * For {@link WALMode#DEFAULT} this value is at least came from padding
+         */
+        public static final int STOP_ITERATION_RECORD_TYPE = 0;
     }
 
     /** */

http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
index ec0e895..f04c278 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
@@ -92,7 +92,7 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
     /** */
     private FreeListImpl dfltFreeList;
 
-    /** */
+    /** Page size from memory configuration, may be set only for fake(standalone) IgniteCacheDataBaseSharedManager */
     private int pageSize;
 
     /** {@inheritDoc} */
@@ -961,4 +961,12 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
     public String systemMemoryPolicyName() {
         return SYSTEM_MEMORY_POLICY_NAME;
     }
+
+    /**
+     * Method for fake (standalone) context initialization. Not to be called in production code
+     * @param pageSize configured page size
+     */
+    protected void setPageSize(int pageSize) {
+        this.pageSize = pageSize;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
new file mode 100644
index 0000000..7dc0a28
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord;
+import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Iterator over WAL segments. This abstract class provides most functionality for reading records in log.
+ * Subclasses are to override segment switching functionality
+ */
+public abstract class AbstractWalRecordsIterator extends GridCloseableIteratorAdapter<IgniteBiTuple<WALPointer, WALRecord>>
+    implements WALIterator {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Current record preloaded, to be returned on next()<br>
+     * Normally this should be not null because advance() method should already prepare some value<br>
+     */
+    protected IgniteBiTuple<WALPointer, WALRecord> curRec;
+
+    /**
+     * Current WAL segment absolute index. <br>
+     * Determined as lowest number of file at start, is changed during advance segment
+     */
+    protected long curWalSegmIdx = -1;
+
+    /**
+     * Current WAL segment read file handle. To be filled by subclass advanceSegment
+     */
+    private FileWriteAheadLogManager.ReadFileHandle currWalSegment;
+
+    /** Logger */
+    @NotNull protected final IgniteLogger log;
+
+    /** Shared context for creating serializer of required version and grid name access */
+    @NotNull private final GridCacheSharedContext sharedCtx;
+
+    /** Serializer of current version to read headers. */
+    @NotNull private final RecordSerializer serializer;
+
+    /** Utility buffer for reading records */
+    private final ByteBuffer buf;
+
+    /**
+     * @param log Logger
+     * @param sharedCtx Shared context
+     * @param serializer Serializer of current version to read headers.
+     * @param bufSize buffer for reading records size
+     */
+    protected AbstractWalRecordsIterator(
+        @NotNull final IgniteLogger log,
+        @NotNull final GridCacheSharedContext sharedCtx,
+        @NotNull final RecordSerializer serializer,
+        final int bufSize) {
+        this.log = log;
+        this.sharedCtx = sharedCtx;
+        this.serializer = serializer;
+
+        // Do not allocate direct buffer for iterator.
+        buf = ByteBuffer.allocate(bufSize);
+        buf.order(ByteOrder.nativeOrder());
+
+    }
+
+    /**
+     * Scans provided folder for a WAL segment files
+     * @param walFilesDir directory to scan
+     * @return found WAL file descriptors
+     */
+    protected static FileWriteAheadLogManager.FileDescriptor[] loadFileDescriptors(@NotNull final File walFilesDir) throws IgniteCheckedException {
+        final File[] files = walFilesDir.listFiles(FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER);
+
+        if (files == null) {
+            throw new IgniteCheckedException("WAL files directory does not not denote a " +
+                "directory, or if an I/O error occurs: [" + walFilesDir.getAbsolutePath() + "]");
+        }
+        return FileWriteAheadLogManager.scan(files);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteBiTuple<WALPointer, WALRecord> onNext() throws IgniteCheckedException {
+        IgniteBiTuple<WALPointer, WALRecord> ret = curRec;
+
+        advance();
+
+        return ret;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean onHasNext() throws IgniteCheckedException {
+        return curRec != null;
+    }
+
+    /**
+     * Switches records iterator to the next record.
+     * <ul>
+     * <li>{@link #curRec} will be updated.</li>
+     * <li> If end of segment reached, switch to new segment is called. {@link #currWalSegment} will be updated.</li>
+     * </ul>
+     *
+     * {@code advance()} runs a step ahead {@link #next()}
+     *
+     * @throws IgniteCheckedException If failed.
+     */
+    protected void advance() throws IgniteCheckedException {
+        while (true) {
+            curRec = advanceRecord(currWalSegment);
+
+            if (curRec != null)
+                return;
+            else {
+                currWalSegment = advanceSegment(currWalSegment);
+
+                if (currWalSegment == null)
+                    return;
+            }
+        }
+    }
+
+    /**
+     * Closes and returns WAL segment (if any)
+     * @return closed handle
+     * @throws IgniteCheckedException if IO failed
+     */
+    @Nullable protected FileWriteAheadLogManager.ReadFileHandle closeCurrentWalSegment() throws IgniteCheckedException {
+        final FileWriteAheadLogManager.ReadFileHandle walSegmentClosed = currWalSegment;
+
+        if (walSegmentClosed != null) {
+            walSegmentClosed.close();
+            currWalSegment = null;
+        }
+        return walSegmentClosed;
+    }
+
+    /**
+     * Switches records iterator to the next WAL segment
+     * as result of this method, new reference to segment should be returned.
+     * Null for current handle means stop of iteration
+     * @throws IgniteCheckedException if reading failed
+     * @param curWalSegment current open WAL segment or null if there is no open segment yet
+     * @return new WAL segment to read or null for stop iteration
+     */
+    protected abstract FileWriteAheadLogManager.ReadFileHandle advanceSegment(
+        @Nullable final FileWriteAheadLogManager.ReadFileHandle curWalSegment) throws IgniteCheckedException;
+
+    /**
+     * Switches to new record
+     * @param hnd currently opened read handle
+     * @return next advanced record
+     */
+    private IgniteBiTuple<WALPointer, WALRecord> advanceRecord(
+        @Nullable final FileWriteAheadLogManager.ReadFileHandle hnd) {
+        if (hnd == null)
+            return null;
+
+        final FileWALPointer ptr = new FileWALPointer(
+            hnd.idx,
+            (int)hnd.in.position(),
+            0);
+
+        try {
+            final WALRecord rec = hnd.ser.readRecord(hnd.in, ptr);
+
+            ptr.length(rec.size());
+
+            // cast using diamond operator here can break compile for 7
+            return new IgniteBiTuple<>((WALPointer)ptr, rec);
+        }
+        catch (IOException | IgniteCheckedException e) {
+            if (!(e instanceof SegmentEofException))
+                handleRecordException(e, ptr);
+            return null;
+        }
+    }
+
+    /**
+     * Handler for record deserialization exception
+     * @param e problem from records reading
+     * @param ptr file pointer was accessed
+     */
+    protected void handleRecordException(
+        @NotNull final Exception e,
+        @Nullable final FileWALPointer ptr) {
+        if (log.isInfoEnabled())
+            log.info("Stopping WAL iteration due to an exception: " + e.getMessage());
+    }
+
+    /**
+     * @param desc File descriptor.
+     * @param start Optional start pointer. Null means read from the beginning
+     * @return Initialized file handle.
+     * @throws FileNotFoundException If segment file is missing.
+     * @throws IgniteCheckedException If initialized failed due to another unexpected error.
+     */
+    protected FileWriteAheadLogManager.ReadFileHandle initReadHandle(
+        @NotNull final FileWriteAheadLogManager.FileDescriptor desc,
+        @Nullable final FileWALPointer start)
+        throws IgniteCheckedException, FileNotFoundException {
+        try {
+            RandomAccessFile rf = new RandomAccessFile(desc.file, "r");
+
+            try {
+                FileChannel ch = rf.getChannel();
+                FileInput in = new FileInput(ch, buf);
+
+                // Header record must be agnostic to the serializer version.
+                WALRecord rec = serializer.readRecord(in,
+                    new FileWALPointer(desc.idx, (int)ch.position(), 0));
+
+                if (rec == null)
+                    return null;
+
+                if (rec.type() != WALRecord.RecordType.HEADER_RECORD)
+                    throw new IOException("Missing file header record: " + desc.file.getAbsoluteFile());
+
+                int ver = ((HeaderRecord)rec).version();
+
+                RecordSerializer ser = FileWriteAheadLogManager.forVersion(sharedCtx, ver);
+
+                if (start != null && desc.idx == start.index())
+                    in.seek(start.fileOffset());
+
+                return new FileWriteAheadLogManager.ReadFileHandle(rf, desc.idx, sharedCtx.igniteInstanceName(), ser, in);
+            }
+            catch (SegmentEofException | EOFException ignore) {
+                try {
+                    rf.close();
+                }
+                catch (IOException ce) {
+                    throw new IgniteCheckedException(ce);
+                }
+
+                return null;
+            }
+            catch (IOException | IgniteCheckedException e) {
+                try {
+                    rf.close();
+                }
+                catch (IOException ce) {
+                    e.addSuppressed(ce);
+                }
+
+                throw e;
+            }
+        }
+        catch (FileNotFoundException e) {
+            throw e;
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(
+                "Failed to initialize WAL segment: " + desc.file.getAbsolutePath(), e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
index be1e477..e2d7cba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
@@ -26,21 +26,25 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaC
 import org.jetbrains.annotations.NotNull;
 
 /**
- * File input.
+ * File input, backed by byte buffer file input.
+ * This class allows to read data by chunks from file and then read primitives
  */
 public final class FileInput implements ByteBufferBackedDataInput {
-    /** */
+    /**
+     * Buffer for reading blocks of data into.
+     * <b>Note:</b> biggest block requested from this input can't be longer than buffer capacity
+     */
     private ByteBuffer buf;
 
-    /** */
+    /** File channel to read chunks from */
     private FileChannel ch;
 
     /** */
     private long pos;
 
     /**
-     * @param ch  Channel.
-     * @param buf Buffer.
+     * @param ch  Channel to read from
+     * @param buf Buffer for reading blocks of data into
      */
     public FileInput(FileChannel ch, ByteBuffer buf) throws IOException {
         assert ch != null;
@@ -101,7 +105,7 @@ public final class FileInput implements ByteBufferBackedDataInput {
             int read = ch.read(buf);
 
             if (read == -1)
-                throw new EOFException();
+                throw new EOFException("EOF at position [" + ch.position() + "] expected to read [" + requested + "] bytes");
 
             available += read;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWALPointer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWALPointer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWALPointer.java
index b6ddfb8..3716de2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWALPointer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWALPointer.java
@@ -46,7 +46,7 @@ public class FileWALPointer implements WALPointer, Comparable<FileWALPointer> {
     }
 
     /**
-     * @param idx File timestamp index.
+     * @param idx Absolute WAL segment file index .
      * @param fileOffset Offset in file, from the beginning.
      * @param len Record length.
      * @param forceFlush Force flush flag.
@@ -59,7 +59,7 @@ public class FileWALPointer implements WALPointer, Comparable<FileWALPointer> {
     }
 
     /**
-     * @return Timestamp index.
+     * @return Absolute WAL segment file index .
      */
     public long index() {
         return idx;

http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 5918141..f877a14 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -27,6 +27,7 @@ import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.channels.FileChannel;
 import java.nio.file.Files;
+import java.sql.Time;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
@@ -34,11 +35,11 @@ import java.util.NavigableMap;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.LockSupport;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
 import org.apache.ignite.IgniteCheckedException;
@@ -46,8 +47,10 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.PersistentStoreConfiguration;
 import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
 import org.apache.ignite.internal.pagemem.wal.StorageException;
 import org.apache.ignite.internal.pagemem.wal.WALIterator;
@@ -58,9 +61,11 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
 import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.PersistenceMetricsImpl;
+import org.apache.ignite.events.WalSegmentArchivedEvent;
 import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord;
 import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer;
-import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
@@ -69,6 +74,7 @@ import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -85,14 +91,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     /** */
     private static final byte[] FILL_BUF = new byte[1024 * 1024];
 
-    /** */
+    /** Pattern for segment file names */
     private static final Pattern WAL_NAME_PATTERN = Pattern.compile("\\d{16}\\.wal");
 
     /** */
     private static final Pattern WAL_TEMP_NAME_PATTERN = Pattern.compile("\\d{16}\\.wal\\.tmp");
 
-    /** */
-    private static final FileFilter WAL_SEGMENT_FILE_FILTER = new FileFilter() {
+    /** WAL segment file filter, see {@link #WAL_NAME_PATTERN} */
+    public static final FileFilter WAL_SEGMENT_FILE_FILTER = new FileFilter() {
         @Override public boolean accept(File file) {
             return !file.isDirectory() && WAL_NAME_PATTERN.matcher(file.getName()).matches();
         }
@@ -118,7 +124,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     private final int tlbSize;
 
     /** WAL flush frequency. Makes sense only for {@link WALMode#BACKGROUND} log WALMode. */
-    public final int flushFreq;
+    private final int flushFreq;
 
     /** Fsync delay. */
     private final long fsyncDelay;
@@ -126,6 +132,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     /** */
     private final PersistentStoreConfiguration psCfg;
 
+    /** Events service */
+    private final GridEventStorageManager evt;
+
     /** */
     private IgniteConfiguration igCfg;
 
@@ -135,10 +144,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     /** */
     private File walWorkDir;
 
-    /** */
+    /** WAL archive directory (including consistent ID as subfolder) */
     private File walArchiveDir;
 
-    /** */
+    /** Serializer of current version, used to read header record and for write records */
     private RecordSerializer serializer;
 
     /** */
@@ -167,18 +176,41 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     private volatile FileArchiver archiver;
 
     /** */
-    private QueueFlusher flusher;
-
-    /** */
     private final ThreadLocal<WALPointer> lastWALPtr = new ThreadLocal<>();
 
     /** Current log segment handle */
     private volatile FileWriteHandle currentHnd;
 
     /**
+     * Positive (non-0) value indicates WAL can be archived even if not complete<br>
+     * See {@link PersistentStoreConfiguration#setWalAutoArchiveAfterInactivity(long)}<br>
+     */
+    private final long walAutoArchiveAfterInactivity;
+
+    /**
+     * Container with last WAL record logged timestamp.<br>
+     * Zero value means there was no records logged to current segment, skip possible archiving for this case<br>
+     * Value is filled only for case {@link #walAutoArchiveAfterInactivity} > 0<br>
+     */
+    private AtomicLong lastRecordLoggedMs = new AtomicLong();
+
+    /**
+     * Cancellable task for {@link WALMode#BACKGROUND}, should be cancelled at shutdown
+     * Null for non background modes
+     */
+    @Nullable private volatile GridTimeoutProcessor.CancelableTask backgroundFlushSchedule;
+
+    /**
+     * Reference to the last added next archive timeout check object.
+     * Null if mode is not enabled.
+     * Should be cancelled at shutdown
+     */
+    @Nullable private volatile GridTimeoutObject nextAutoArchiveTimeoutObj;
+
+    /**
      * @param ctx Kernal context.
      */
-    public FileWriteAheadLogManager(GridKernalContext ctx) {
+    public FileWriteAheadLogManager(@NotNull final GridKernalContext ctx) {
         igCfg = ctx.config();
 
         PersistentStoreConfiguration psCfg = igCfg.getPersistentStoreConfiguration();
@@ -193,6 +225,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         flushFreq = psCfg.getWalFlushFrequency();
         fsyncDelay = psCfg.getWalFsyncDelay();
         alwaysWriteFullPages = psCfg.isAlwaysWriteFullPages();
+        walAutoArchiveAfterInactivity = psCfg.getWalAutoArchiveAfterInactivity();
+        evt = ctx.event();
     }
 
     /** {@inheritDoc} */
@@ -248,8 +282,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         if (psCfg.getWalStorePath() == null ^ psCfg.getWalArchivePath() == null) {
             throw new IgniteCheckedException(
                 "Properties should be either both specified or both null " +
-                "[walStorePath = " + psCfg.getWalStorePath() +
-                ", walArchivePath = " + psCfg.getWalArchivePath() + "]"
+                    "[walStorePath = " + psCfg.getWalStorePath() +
+                    ", walArchivePath = " + psCfg.getWalArchivePath() + "]"
             );
         }
     }
@@ -271,26 +305,32 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
     /** {@inheritDoc} */
     @Override protected void stop0(boolean cancel) {
-        FileWriteHandle currentHnd = currentHandle();
+        final GridTimeoutProcessor.CancelableTask schedule = backgroundFlushSchedule;
 
-        try {
-            QueueFlusher flusher0 = flusher;
+        if (schedule != null)
+            schedule.close();
 
-            if (flusher0 != null) {
-                flusher0.shutdown();
+        final GridTimeoutObject timeoutObj = nextAutoArchiveTimeoutObj;
 
-                if (currentHnd != null)
-                    currentHnd.flush((FileWALPointer)null);
+        if (timeoutObj != null)
+            cctx.time().removeTimeoutObject(timeoutObj);
+
+        final FileWriteHandle currHnd = currentHandle();
+
+        try {
+            if (mode == WALMode.BACKGROUND) {
+                if (currHnd != null)
+                    currHnd.flush((FileWALPointer)null);
             }
 
-            if (currentHnd != null)
-                currentHnd.close(false);
+            if (currHnd != null)
+                currHnd.close(false);
 
             if (archiver != null)
                 archiver.shutdown();
         }
         catch (Exception e) {
-            U.error(log, "Failed to gracefully close WAL segment: " + currentHnd.file, e);
+            U.error(log, "Failed to gracefully close WAL segment: " + currHnd.file, e);
         }
     }
 
@@ -350,39 +390,114 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             }
 
             if (mode == WALMode.BACKGROUND) {
-                flusher = new QueueFlusher(cctx.igniteInstanceName());
-
-                flusher.start();
+                backgroundFlushSchedule = cctx.time().schedule(new Runnable() {
+                    @Override public void run() {
+                        doFlush();
+                    }
+                }, flushFreq, flushFreq);
             }
+
+            if (walAutoArchiveAfterInactivity > 0)
+                scheduleNextInactivityPeriodElapsedCheck();
         }
         catch (StorageException e) {
             throw new IgniteCheckedException(e);
         }
     }
 
+    /**
+     * Schedules next check of inactivity period expired. Based on current record update timestamp.
+     * At timeout method does check of inactivity period and schedules new launch.
+     */
+    private void scheduleNextInactivityPeriodElapsedCheck() {
+        final long lastRecMs = lastRecordLoggedMs.get();
+        final long nextPossibleAutoArchive = (lastRecMs <= 0 ? U.currentTimeMillis() : lastRecMs) + walAutoArchiveAfterInactivity;
+
+        if (log.isDebugEnabled())
+            log.debug("Schedule WAL rollover check at " + new Time(nextPossibleAutoArchive).toString());
+
+        nextAutoArchiveTimeoutObj = new GridTimeoutObject() {
+            private final IgniteUuid id = IgniteUuid.randomUuid();
+
+            @Override public IgniteUuid timeoutId() {
+                return id;
+            }
+
+            @Override public long endTime() {
+                return nextPossibleAutoArchive;
+            }
+
+            @Override public void onTimeout() {
+                if (log.isDebugEnabled())
+                    log.debug("Checking if WAL rollover required (" + new Time(U.currentTimeMillis()).toString() + ")");
+
+                checkWalRolloverRequiredDuringInactivityPeriod();
+
+                scheduleNextInactivityPeriodElapsedCheck();
+            }
+        };
+        cctx.time().addTimeoutObject(nextAutoArchiveTimeoutObj);
+    }
+
+    /**
+     * Checks if there was elapsed significant period of inactivity.
+     * If WAL auto-archive is enabled using {@link #walAutoArchiveAfterInactivity} > 0 this method will activate
+     * roll over by timeout<br>
+     */
+    private void checkWalRolloverRequiredDuringInactivityPeriod() {
+        if (walAutoArchiveAfterInactivity <= 0)
+            return; // feature not configured, nothing to do
+
+        final long lastRecMs = lastRecordLoggedMs.get();
+
+        if (lastRecMs == 0)
+            return; //no records were logged to current segment, does not consider inactivity
+
+        final long elapsedMs = U.currentTimeMillis() - lastRecMs;
+
+        if (elapsedMs <= walAutoArchiveAfterInactivity)
+            return; // not enough time elapsed since last write
+
+        if (!lastRecordLoggedMs.compareAndSet(lastRecMs, 0))
+            return; // record write occurred concurrently
+
+        final FileWriteHandle handle = currentHandle();
+
+        try {
+            rollOver(handle);
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Unable to perform segment rollover: " + e.getMessage(), e);
+            handle.invalidateEnvironment(e);
+        }
+    }
+
     /** {@inheritDoc} */
     @SuppressWarnings("TooBroadScope")
     @Override public WALPointer log(WALRecord record) throws IgniteCheckedException, StorageException {
         if (serializer == null || mode == WALMode.NONE)
             return null;
 
-        FileWriteHandle current = currentHandle();
+        FileWriteHandle currWrHandle = currentHandle();
 
         // Logging was not resumed yet.
-        if (current == null)
+        if (currWrHandle == null)
             return null;
 
         // Need to calculate record size first.
         record.size(serializer.size(record));
 
-        for (; ; current = rollOver(current)) {
-            WALPointer ptr = current.addRecord(record);
+        for (; ; currWrHandle = rollOver(currWrHandle)) {
+            WALPointer ptr = currWrHandle.addRecord(record);
 
             if (ptr != null) {
                 metrics.onWalRecordLogged();
 
                 lastWALPtr.set(ptr);
 
+                if (walAutoArchiveAfterInactivity > 0)
+                    lastRecordLoggedMs.set(U.currentTimeMillis());
+
                 return ptr;
             }
 
@@ -665,6 +780,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
             assert swapped : "Concurrent updates on rollover are not allowed";
 
+            if (walAutoArchiveAfterInactivity > 0)
+                lastRecordLoggedMs.set(0);
+
             // Let other threads to proceed with new segment.
             hnd.signalNextAvailable();
         }
@@ -888,7 +1006,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
      * @param ver Serializer version.
      * @return Entry serializer.
      */
-    private static RecordSerializer forVersion(GridCacheSharedContext cctx, int ver) throws IgniteCheckedException {
+    static RecordSerializer forVersion(GridCacheSharedContext cctx, int ver) throws IgniteCheckedException {
         if (ver <= 0)
             throw new IgniteCheckedException("Failed to create a serializer (corrupted WAL file).");
 
@@ -905,7 +1023,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     /**
      * @return Sorted WAL files descriptors.
      */
-    private static FileDescriptor[] scan(File[] allFiles) {
+    public static FileDescriptor[] scan(File[] allFiles) {
         if (allFiles == null)
             return EMPTY_DESCRIPTORS;
 
@@ -931,11 +1049,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
      *
      * Monitor of current object is used for notify on:
      * <ul>
-     *     <li>exception occurred ({@link FileArchiver#cleanException}!=null)</li>
-     *     <li>stopping thread ({@link FileArchiver#stopped}==true)</li>
-     *     <li>current file index changed ({@link FileArchiver#curAbsWalIdx})</li>
-     *     <li>last archived file index was changed ({@link FileArchiver#lastAbsArchivedIdx})</li>
-     *     <li>some WAL index was removed from {@link FileArchiver#locked} map</li>
+     * <li>exception occurred ({@link FileArchiver#cleanException}!=null)</li>
+     * <li>stopping thread ({@link FileArchiver#stopped}==true)</li>
+     * <li>current file index changed ({@link FileArchiver#curAbsWalIdx})</li>
+     * <li>last archived file index was changed ({@link FileArchiver#lastAbsArchivedIdx})</li>
+     * <li>some WAL index was removed from {@link FileArchiver#locked} map</li>
      * </ul>
      */
     private class FileArchiver extends Thread {
@@ -1017,6 +1135,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
         /**
          * Check if WAL segment locked or reserved
+         *
          * @param absIdx Index for check reservation.
          * @return {@code True} if index is reserved.
          */
@@ -1080,7 +1199,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                         break;
 
                     try {
-                        File workFile = archiveSegment(toArchive);
+                        final SegmentArchiveResult res = archiveSegment(toArchive);
 
                         synchronized (this) {
                             while (locked.containsKey(toArchive) && !stopped)
@@ -1088,13 +1207,16 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
                             // Firstly, format working file
                             if (!stopped)
-                                formatFile(workFile);
+                                formatFile(res.getOrigWorkFile());
 
                             // Then increase counter to allow rollover on clean working file
                             lastAbsArchivedIdx = toArchive;
 
                             notifyAll();
                         }
+                        if (evt.isRecordable(EventType.EVT_WAL_SEGMENT_ARCHIVED))
+                            evt.record(new WalSegmentArchivedEvent(cctx.discovery().localNode(),
+                                res.getAbsIdx(), res.getDstArchiveFile()));
                     }
                     catch (IgniteCheckedException e) {
                         synchronized (this) {
@@ -1115,7 +1237,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          * Blocks till there are available file to write
          *
          * @param curIdx Current absolute index that we want to increment.
-         * @return Next index (curIdx+1) when it is ready to be written.
+         * @return Next index (curWalSegmIdx+1) when it is ready to be written.
          * @throws IgniteCheckedException If failed (if interrupted or if exception occurred in the archiver thread).
          */
         private long nextAbsoluteSegmentIndex(long curIdx) throws IgniteCheckedException {
@@ -1195,9 +1317,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         }
 
         /**
+         * Moves WAL segment from work folder to archive folder.
+         * Temp file is used to do movement
+         *
          * @param absIdx Absolute index to archive.
          */
-        private File archiveSegment(long absIdx) throws IgniteCheckedException {
+        private SegmentArchiveResult archiveSegment(long absIdx) throws IgniteCheckedException {
             long segIdx = absIdx % psCfg.getWalSegments();
 
             File origFile = new File(walWorkDir, FileDescriptor.fileName(segIdx));
@@ -1235,7 +1360,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                 log.debug("Copied file [src=" + origFile.getAbsolutePath() +
                     ", dst=" + dstFile.getAbsolutePath() + ']');
 
-            return origFile;
+            return new SegmentArchiveResult(absIdx, origFile, dstFile);
         }
 
         /**
@@ -1316,7 +1441,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     /**
      * WAL file descriptor.
      */
-    private static class FileDescriptor implements Comparable<FileDescriptor> {
+    public static class FileDescriptor implements Comparable<FileDescriptor> {
         /** */
         protected final File file;
 
@@ -1324,9 +1449,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         protected final long idx;
 
         /**
-         * @param file File.
+         * Creates file descriptor. Index is restored from file name
+         *
+         * @param file WAL segment file.
          */
-        private FileDescriptor(File file) {
+        public FileDescriptor(@NotNull File file) {
             this(file, null);
         }
 
@@ -1334,7 +1461,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          * @param file WAL segment file.
          * @param idx Absolute WAL segment file index. For null value index is restored from file name
          */
-        private FileDescriptor(@NotNull File file, @Nullable Long idx) {
+        public FileDescriptor(@NotNull File file, @Nullable Long idx) {
             this.file = file;
 
             String fileName = file.getName();
@@ -1350,7 +1477,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          * @param segment Segment index.
          * @return Segment file name.
          */
-        private static String fileName(long segment) {
+        public static String fileName(long segment) {
             SB b = new SB();
 
             String segmentStr = Long.toString(segment);
@@ -1402,6 +1529,20 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         @Override public int hashCode() {
             return (int)(idx ^ (idx >>> 32));
         }
+
+        /**
+         * @return Absolute WAL segment file index
+         */
+        public long getIdx() {
+            return idx;
+        }
+
+        /**
+         * @return absolute pathname string of this file descriptor pathname.
+         */
+        public String getAbsolutePath() {
+            return file.getAbsolutePath();
+        }
     }
 
     /**
@@ -1438,14 +1579,17 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     /**
      *
      */
-    private static class ReadFileHandle extends FileHandle {
+    public static class ReadFileHandle extends FileHandle {
         /** Entry serializer. */
-        private RecordSerializer ser;
+        RecordSerializer ser;
 
         /** */
-        private FileInput in;
+        FileInput in;
 
-        /** */
+        /**
+         * <code>true</code> if this file handle came from work directory.
+         * <code>false</code> if this file handle came from archive directory.
+         */
         private boolean workDir;
 
         /**
@@ -1454,7 +1598,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          * @param ser Entry serializer.
          * @param in File input.
          */
-        private ReadFileHandle(
+        ReadFileHandle(
             RandomAccessFile file,
             long idx,
             String gridName,
@@ -1499,7 +1643,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          */
         private final AtomicReference<WALRecord> head = new AtomicReference<>();
 
-        /** Position in current file after the end of last written record (incremented after file channel write operation) */
+        /**
+         * Position in current file after the end of last written record (incremented after file channel write
+         * operation)
+         */
         private volatile long written;
 
         /** */
@@ -1508,7 +1655,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         /** Environment failure. */
         private volatile Throwable envFailed;
 
-        /** Stop guard to provide warranty that only one thread will be successful in calling {@link #close(boolean)}*/
+        /** Stop guard to provide warranty that only one thread will be successful in calling {@link #close(boolean)} */
         private final AtomicBoolean stop = new AtomicBoolean(false);
 
         /** */
@@ -1754,6 +1901,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
         /**
          * Serializes WAL records chain to provided byte buffer
+         *
          * @param buf Buffer, will be filled with records chain from end to beginning
          * @param head Head of the chain to write to the buffer.
          * @return Position in file for this buffer.
@@ -1886,11 +2034,15 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                     flushOrWait(null);
 
                 try {
-                    if (rollOver && written < (maxSegmentSize - 1)) {
-                        ByteBuffer allocate = ByteBuffer.allocate(1);
-                        allocate.put((byte) WALRecord.RecordType.SWITCH_SEGMENT_RECORD.ordinal());
-
-                        ch.write(allocate, written);
+                    int switchSegmentRecSize = RecordV1Serializer.REC_TYPE_SIZE + RecordV1Serializer.FILE_WAL_POINTER_SIZE;
+                    if (rollOver && written < (maxSegmentSize - switchSegmentRecSize)) {
+                        //it is expected there is sufficient space for this record because rollover should run early
+                        final ByteBuffer buf = ByteBuffer.allocate(switchSegmentRecSize);
+                        buf.put((byte)(WALRecord.RecordType.SWITCH_SEGMENT_RECORD.ordinal() + 1));
+                        final FileWALPointer pointer = new FileWALPointer(idx, (int)ch.position(), -1);
+                        RecordV1Serializer.putPosition(buf, pointer);
+                        buf.rewind();
+                        ch.write(buf, written);
 
                         if (mode == WALMode.DEFAULT)
                             ch.force(false);
@@ -1951,8 +2103,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         }
 
         /**
-         * @param pos Position in file to start write from.
-         * May be checked against actual position to wait previous writes to complete
+         * @param pos Position in file to start write from. May be checked against actual position to wait previous
+         * writes to complete
          * @param buf Buffer to write to file
          * @throws StorageException If failed.
          * @throws IgniteCheckedException If failed.
@@ -2133,8 +2285,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     /**
      * Iterator over WAL-log.
      */
-    private static class RecordsIterator extends GridCloseableIteratorAdapter<IgniteBiTuple<WALPointer, WALRecord>>
-        implements WALIterator {
+    private static class RecordsIterator extends AbstractWalRecordsIterator {
         /** */
         private static final long serialVersionUID = 0L;
         /** */
@@ -2149,33 +2300,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         /** */
         private final PersistentStoreConfiguration psCfg;
 
-        /** */
-        private final RecordSerializer serializer;
-
-        /** */
-        private final GridCacheSharedContext cctx;
-
-        /** */
+        /** Optional start pointer. */
+        @Nullable
         private FileWALPointer start;
 
-        /** */
+        /** Optional end pointer. */
+        @Nullable
         private FileWALPointer end;
 
-        /** */
-        private IgniteBiTuple<WALPointer, WALRecord> curRec;
-
-        /** */
-        private long curIdx = -1;
-
-        /** */
-        private ReadFileHandle curHandle;
-
-        /** */
-        private ByteBuffer buf;
-
-        /** */
-        private IgniteLogger log;
-
         /**
          * @param cctx Shared context.
          * @param walWorkDir WAL work dir.
@@ -2183,37 +2315,33 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          * @param start Optional start pointer.
          * @param end Optional end pointer.
          * @param psCfg Database configuration.
-         * @param serializer Serializer.
+         * @param serializer Serializer of current version to read headers.
          * @param archiver Archiver.
+         * @param log Logger
          * @throws IgniteCheckedException If failed to initialize WAL segment.
          */
         private RecordsIterator(
             GridCacheSharedContext cctx,
             File walWorkDir,
             File walArchiveDir,
-            FileWALPointer start,
-            FileWALPointer end,
+            @Nullable FileWALPointer start,
+            @Nullable FileWALPointer end,
             PersistentStoreConfiguration psCfg,
-            RecordSerializer serializer,
+            @NotNull RecordSerializer serializer,
             FileArchiver archiver,
             IgniteLogger log,
             int tlbSize
         ) throws IgniteCheckedException {
-            this.cctx = cctx;
+            super(log,
+                cctx,
+                serializer,
+                Math.min(16 * tlbSize, psCfg.getWalRecordIteratorBufferSize()));
             this.walWorkDir = walWorkDir;
             this.walArchiveDir = walArchiveDir;
             this.psCfg = psCfg;
-            this.serializer = serializer;
             this.archiver = archiver;
             this.start = start;
             this.end = end;
-            this.log = log;
-
-            int buffSize = Math.min(16 * tlbSize, psCfg.getWalRecordIteratorBufferSize());
-
-            // Do not allocate direct buffer for iterator.
-            buf = ByteBuffer.allocate(buffSize);
-            buf.order(ByteOrder.nativeOrder());
 
             init();
 
@@ -2221,40 +2349,21 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         }
 
         /** {@inheritDoc} */
-        @Override protected IgniteBiTuple<WALPointer, WALRecord> onNext() throws IgniteCheckedException {
-            IgniteBiTuple<WALPointer, WALRecord> ret = curRec;
-
-            advance();
-
-            return ret;
-        }
-
-        /** {@inheritDoc} */
-        @Override protected boolean onHasNext() throws IgniteCheckedException {
-            return curRec != null;
-        }
-
-        /** {@inheritDoc} */
         @Override protected void onClose() throws IgniteCheckedException {
             curRec = null;
 
-            if (curHandle != null) {
-                curHandle.close();
+            final ReadFileHandle handle = closeCurrentWalSegment();
+            if (handle != null && handle.workDir)
+                releaseWorkSegment(curWalSegmIdx);
 
-                if (curHandle.workDir)
-                    releaseWorkSegment(curIdx);
-
-                curHandle = null;
-            }
-
-            curIdx = Integer.MAX_VALUE;
+            curWalSegmIdx = Integer.MAX_VALUE;
         }
 
         /**
          * @throws IgniteCheckedException If failed to initialize first file handle.
          */
         private void init() throws IgniteCheckedException {
-            FileDescriptor[] descs = scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_FILTER));
+            FileDescriptor[] descs = loadFileDescriptors(walArchiveDir);
 
             if (start != null) {
                 if (!F.isEmpty(descs)) {
@@ -2264,13 +2373,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
                     for (FileDescriptor desc : descs) {
                         if (desc.idx == start.index()) {
-                            curIdx = start.index();
+                            curWalSegmIdx = start.index();
 
                             break;
                         }
                     }
 
-                    if (curIdx == -1) {
+                    if (curWalSegmIdx == -1) {
                         long lastArchived = descs[descs.length - 1].idx;
 
                         if (lastArchived > start.index())
@@ -2278,203 +2387,86 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
                         // This pointer may be in work files because archiver did not
                         // copy the file yet, check that it is not too far forward.
-                        curIdx = start.index();
+                        curWalSegmIdx = start.index();
                     }
                 }
                 else {
                     // This means that whole checkpoint history fits in one segment in WAL work directory.
                     // Will start from this index right away.
-                    curIdx = start.index();
+                    curWalSegmIdx = start.index();
                 }
             }
             else
-                curIdx = !F.isEmpty(descs) ? descs[0].idx : 0;
+                curWalSegmIdx = !F.isEmpty(descs) ? descs[0].idx : 0;
 
-            curIdx--;
+            curWalSegmIdx--;
 
             if (log.isDebugEnabled())
-                log.debug("Initialized WAL cursor [start=" + start + ", end=" + end + ", curIdx=" + curIdx + ']');
-        }
-
-        /**
-         * @throws IgniteCheckedException If failed.
-         */
-        private void advance() throws IgniteCheckedException {
-            while (true) {
-                advanceRecord();
-
-                if (curRec != null)
-                    return;
-                else {
-                    advanceSegment();
-
-                    if (curHandle == null)
-                        return;
-                }
-            }
-        }
-
-        /**
-         *
-         */
-        private void advanceRecord() {
-            try {
-                ReadFileHandle hnd = curHandle;
-
-                if (hnd != null) {
-                    RecordSerializer ser = hnd.ser;
-
-                    int pos = (int)hnd.in.position();
-
-                    FileWALPointer ptr = new FileWALPointer(hnd.idx, pos, 0);
-
-                    WALRecord rec = ser.readRecord(hnd.in, ptr);
-
-                    ptr.length(rec.size());
-
-                    curRec = new IgniteBiTuple<WALPointer, WALRecord>(ptr, rec);
-                }
-            }
-            catch (IOException | IgniteCheckedException e) {
-                if (!(e instanceof SegmentEofException)) {
-                    if (log.isInfoEnabled())
-                        log.info("Stopping WAL iteration due to an exception: " + e.getMessage());
-                }
-
-                curRec = null;
-            }
+                log.debug("Initialized WAL cursor [start=" + start + ", end=" + end + ", curWalSegmIdx=" + curWalSegmIdx + ']');
         }
 
-        /**
-         * @throws IgniteCheckedException If failed.
-         */
-        private void advanceSegment() throws IgniteCheckedException {
-            ReadFileHandle cur0 = curHandle;
-
-            if (cur0 != null) {
-                cur0.close();
+        /** {@inheritDoc} */
+        @Override protected ReadFileHandle advanceSegment(
+            @Nullable final ReadFileHandle curWalSegment) throws IgniteCheckedException {
+            if (curWalSegment != null) {
+                curWalSegment.close();
 
-                if (cur0.workDir)
-                    releaseWorkSegment(cur0.idx);
+                if (curWalSegment.workDir)
+                    releaseWorkSegment(curWalSegment.idx);
 
-                curHandle = null;
             }
 
             // We are past the end marker.
-            if (end != null && curIdx + 1 > end.index())
-                return;
+            if (end != null && curWalSegmIdx + 1 > end.index())
+                return null; //stop iteration
 
-            curIdx++;
+            curWalSegmIdx++;
 
             FileDescriptor fd;
 
-            boolean readArchive = canReadArchiveOrReserveWork(curIdx);
+            boolean readArchive = canReadArchiveOrReserveWork(curWalSegmIdx);
 
             if (readArchive) {
                 fd = new FileDescriptor(new File(walArchiveDir,
-                    FileDescriptor.fileName(curIdx)));
+                    FileDescriptor.fileName(curWalSegmIdx)));
             }
             else {
-                long workIdx = curIdx % psCfg.getWalSegments();
+                long workIdx = curWalSegmIdx % psCfg.getWalSegments();
 
                 fd = new FileDescriptor(
                     new File(walWorkDir, FileDescriptor.fileName(workIdx)),
-                    curIdx);
+                    curWalSegmIdx);
             }
 
             if (log.isDebugEnabled())
-                log.debug("Reading next file [absIdx=" + curIdx + ", file=" + fd.file.getAbsolutePath() + ']');
+                log.debug("Reading next file [absIdx=" + curWalSegmIdx + ", file=" + fd.file.getAbsolutePath() + ']');
 
             assert fd != null;
 
+            ReadFileHandle nextHandle;
             try {
-                curHandle = initReadHandle(fd, start != null && curIdx == start.index() ? start : null);
+                nextHandle = initReadHandle(fd, start != null && curWalSegmIdx == start.index() ? start : null);
             }
             catch (FileNotFoundException e) {
                 if (readArchive)
                     throw new IgniteCheckedException("Missing WAL segment in the archive", e);
                 else
-                    curHandle = null;
+                    nextHandle = null;
             }
 
-            if (curHandle != null)
-                curHandle.workDir = !readArchive;
+            if (nextHandle != null)
+                nextHandle.workDir = !readArchive;
             else
-                releaseWorkSegment(curIdx);
+                releaseWorkSegment(curWalSegmIdx);
 
             curRec = null;
-        }
-
-        /**
-         * @param desc File descriptor.
-         * @param start Optional start pointer.
-         * @return Initialized file handle.
-         * @throws FileNotFoundException If segment file is missing.
-         * @throws IgniteCheckedException If initialized failed due to another unexpected error.
-         */
-        private ReadFileHandle initReadHandle(FileDescriptor desc, FileWALPointer start)
-            throws IgniteCheckedException, FileNotFoundException {
-            try {
-                RandomAccessFile rf = new RandomAccessFile(desc.file, "r");
-
-                try {
-                    FileChannel channel = rf.getChannel();
-                    FileInput in = new FileInput(channel, buf);
-
-                    // Header record must be agnostic to the serializer version.
-                    WALRecord rec = serializer.readRecord(in,
-                        new FileWALPointer(desc.idx, (int)channel.position(), 0));
-
-                    if (rec == null)
-                        return null;
-
-                    if (rec.type() != WALRecord.RecordType.HEADER_RECORD)
-                        throw new IOException("Missing file header record: " + desc.file.getAbsoluteFile());
-
-                    int ver = ((HeaderRecord)rec).version();
-
-                    RecordSerializer ser = forVersion(cctx, ver);
-
-                    if (start != null && desc.idx == start.index())
-                        in.seek(start.fileOffset());
-
-                    return new ReadFileHandle(rf, desc.idx, cctx.igniteInstanceName(), ser, in);
-                }
-                catch (SegmentEofException | EOFException ignore) {
-                    try {
-                        rf.close();
-                    }
-                    catch (IOException ce) {
-                        throw new IgniteCheckedException(ce);
-                    }
-
-                    return null;
-                }
-                catch (IOException | IgniteCheckedException e) {
-                    try {
-                        rf.close();
-                    }
-                    catch (IOException ce) {
-                        e.addSuppressed(ce);
-                    }
-
-                    throw e;
-                }
-            }
-            catch (FileNotFoundException e) {
-                throw e;
-            }
-            catch (IOException e) {
-                throw new IgniteCheckedException(
-                    "Failed to initialize WAL segment: " + desc.file.getAbsolutePath(), e);
-            }
+            return nextHandle;
         }
 
         /**
          * @param absIdx Absolute index to check.
-         * @return {@code True} if we can safely read the archive, {@code false} if the segment has not been
-         *      archived yet. In this case the corresponding work segment is reserved (will not be deleted until
-         *      release).
+         * @return {@code True} if we can safely read the archive, {@code false} if the segment has not been archived
+         * yet. In this case the corresponding work segment is reserved (will not be deleted until release).
          */
         private boolean canReadArchiveOrReserveWork(long absIdx) {
             return archiver != null && archiver.checkCanReadArchiveOrReserveWorkSegment(absIdx);
@@ -2490,51 +2482,17 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     }
 
     /**
-     * Periodically flushes current file handle for {@link WALMode#BACKGROUND} WALMode.
+     * Flushes current file handle for {@link WALMode#BACKGROUND} WALMode.
+     * Called periodically from scheduler.
      */
-    private class QueueFlusher extends Thread {
-        /** */
-        private volatile boolean stopped;
+    private void doFlush() {
+        final FileWriteHandle hnd = currentHandle();
 
-        /**
-         * @param gridName Grid name.
-         */
-        private QueueFlusher(String gridName) {
-            super("wal-queue-flusher-#" + gridName);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void run() {
-            while (!stopped) {
-                long wakeup = U.currentTimeMillis() + flushFreq;
-
-                LockSupport.parkUntil(wakeup);
-
-                FileWriteHandle hnd = currentHandle();
-
-                try {
-                    hnd.flush(hnd.head.get());
-                }
-                catch (IgniteCheckedException e) {
-                    U.warn(log, "Failed to flush WAL record queue", e);
-                }
-            }
+        try {
+            hnd.flush(hnd.head.get());
         }
-
-        /**
-         * Signals stop, wakes up thread and waiting until completion.
-         */
-        private void shutdown() {
-            stopped = true;
-
-            LockSupport.unpark(this);
-
-            try {
-                join();
-            }
-            catch (InterruptedException ignore) {
-                // Got interrupted while waiting for flusher to shutdown.
-            }
+        catch (IgniteCheckedException e) {
+            U.warn(log, "Failed to flush WAL record queue", e);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordSerializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordSerializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordSerializer.java
index 75a62a9..1ea7fa6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordSerializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordSerializer.java
@@ -33,6 +33,8 @@ public interface RecordSerializer {
     public int version();
 
     /**
+     * Calculates record size in byte including expected wal pointer, CRC and type field
+     *
      * @param record Record.
      * @return Size in bytes.
      */
@@ -45,7 +47,10 @@ public interface RecordSerializer {
     public void writeRecord(WALRecord record, ByteBuffer buf) throws IgniteCheckedException;
 
     /**
+     * Loads record from input
+     *
      * @param in Data input to read data from.
+     * @param expPtr expected WAL pointer for record. Used to validate actual position against expected from the file
      * @return Read entry.
      */
     public WALRecord readRecord(FileInput in, WALPointer expPtr) throws IOException, IgniteCheckedException;

http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentArchiveResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentArchiveResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentArchiveResult.java
new file mode 100644
index 0000000..5b65970
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentArchiveResult.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.persistence.wal;
+
+import java.io.File;
+
+/**
+ * Result of archiving (movement) operation
+ * Replacement of generic T3-Tuple
+ */
+class SegmentArchiveResult {
+    /** Absolute WAL segment file index. */
+    private final long absWalIdx;
+
+    /** Original work file. May and most likely to be used for new WAL round */
+    private final File origWorkFile;
+
+    /** Destination archive file. This file is completed and closed archive segment */
+    private final File dstArchiveFile;
+
+    /**
+     * Creates result
+     * @param absWalIdx Absolute wal index.
+     * @param origWorkFile Orig work file.
+     * @param dstArchiveFile Dst archive file.
+     */
+    SegmentArchiveResult(long absWalIdx, File origWorkFile, File dstArchiveFile) {
+        this.absWalIdx = absWalIdx;
+        this.origWorkFile = origWorkFile;
+        this.dstArchiveFile = dstArchiveFile;
+    }
+
+    /** @return {@link #absWalIdx} */
+    long getAbsIdx() {
+        return absWalIdx;
+    }
+
+    /** @return {@link #origWorkFile} */
+    File getOrigWorkFile() {
+        return origWorkFile;
+    }
+
+    /** @return {@link #dstArchiveFile} */
+    File getDstArchiveFile() {
+        return dstArchiveFile;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentEofException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentEofException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentEofException.java
index 80c375e..2f58e3d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentEofException.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentEofException.java
@@ -21,7 +21,8 @@ import org.apache.ignite.IgniteCheckedException;
 
 /**
  * This exception is thrown either when we reach the end of file of WAL segment, or when we encounter
- * a record with type equal to {@code 0}.
+ * a record with type equal to
+ * {@link org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType#STOP_ITERATION_RECORD_TYPE}
  */
 public class SegmentEofException extends IgniteCheckedException {
     /** */

http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
new file mode 100644
index 0000000..8ea0585
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.reader;
+
+import java.io.File;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Factory for creating iterator over WAL files
+ */
+public class IgniteWalIteratorFactory {
+    /** Logger. */
+    private final IgniteLogger log;
+    /** Page size, in standalone iterator mode this value can't be taken from memory configuration */
+    private final int pageSize;
+
+    /**
+     * Creates WAL files iterator factory
+     * @param log Logger.
+     * @param pageSize Page size, size is validated
+     */
+    public IgniteWalIteratorFactory(@NotNull final IgniteLogger log, final int pageSize) {
+        this.log = log;
+        this.pageSize = pageSize;
+        new MemoryConfiguration().setPageSize(pageSize); // just for validate
+    }
+
+    /**
+     * Creates iterator for (archive) directory scan mode.
+     * Note in this mode total scanned files at end of iteration may be wider that initial files in directory.
+     * This mode does not support work directory scan because work directory contains unpredictable number in file name.
+     * Such file may broke iteration.
+     *
+     * @param walDirWithConsistentId directory with WAL files. Should already contain node consistent ID as subfolder
+     * @return closable WAL records iterator, should be closed when non needed
+     * @throws IgniteCheckedException if failed to read folder
+     */
+    public WALIterator iteratorArchiveDirectory(@NotNull final File walDirWithConsistentId) throws IgniteCheckedException {
+        return new StandaloneWalRecordsIterator(walDirWithConsistentId, log, prepareSharedCtx());
+    }
+
+    /**
+     * Creates iterator for file by file scan mode.
+     * This method may be used only for archive folder (not for work).
+     * In this mode only provided WAL segments will be scanned. New WAL files created during iteration will be ignored
+     * @param files files to scan. Order it not important, but is significant to provide all segments without omissions
+     * @return closable WAL records iterator, should be closed when non needed
+     * @throws IgniteCheckedException if failed to read files
+     */
+    public WALIterator iteratorArchiveFiles(@NotNull final File ...files) throws IgniteCheckedException {
+        return new StandaloneWalRecordsIterator(log, prepareSharedCtx(), false, files);
+    }
+
+    /**
+     * Creates iterator for file by file scan mode.
+     * This method may be used for work folder, file indexes are scanned from the file context.
+     * In this mode only provided WAL segments will be scanned. New WAL files created during iteration will be ignored.
+     * @param files files to scan. Order it not important, but is significant to provide all segments without omissions
+     * @return closable WAL records iterator, should be closed when non needed
+     * @throws IgniteCheckedException if failed to read files
+     */
+    public WALIterator iteratorWorkFiles(@NotNull final File ...files) throws IgniteCheckedException {
+        return new StandaloneWalRecordsIterator(log, prepareSharedCtx(), true, files);
+    }
+
+    /**
+     * @return fake shared context required for create minimal services for record reading
+     */
+    @NotNull private GridCacheSharedContext prepareSharedCtx() {
+        final GridKernalContext kernalCtx = new StandaloneGridKernalContext(log);
+
+        final StandaloneIgniteCacheDatabaseSharedManager dbMgr = new StandaloneIgniteCacheDatabaseSharedManager();
+
+        dbMgr.setPageSize(pageSize);
+        return new GridCacheSharedContext<>(
+            kernalCtx, null, null, null,
+            null, null, dbMgr, null,
+            null, null, null, null,
+            null, null, null);
+    }
+}


Mime
View raw message