ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [36/50] ignite git commit: GG-12418 - WAL hangs on any error during segment rollover
Date Tue, 11 Jul 2017 09:27:35 GMT
GG-12418 - WAL hangs on any error during segment rollover


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/17d881ba
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/17d881ba
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/17d881ba

Branch: refs/heads/master
Commit: 17d881ba0122a7f90cac9846c376300a1d001bdd
Parents: f1c8e59
Author: Pavel Kovalenko <jokserfn@gmail.com>
Authored: Mon Jul 10 13:55:47 2017 +0300
Committer: Alexey Goncharuk <alexey.goncharuk@gmail.com>
Committed: Mon Jul 10 13:57:51 2017 +0300

----------------------------------------------------------------------
 .../PersistentStoreConfiguration.java           |  24 +++
 .../cache/persistence/file/FileIO.java          | 154 +++++++++++++++
 .../cache/persistence/file/FileIODecorator.java |  98 ++++++++++
 .../cache/persistence/file/FileIOFactory.java   |  45 +++++
 .../cache/persistence/file/FilePageStore.java   |  51 +++--
 .../persistence/file/FilePageStoreManager.java  |   2 +
 .../persistence/file/RandomAccessFileIO.java    | 110 +++++++++++
 .../file/RandomAccessFileIOFactory.java         |  42 ++++
 .../wal/AbstractWalRecordsIterator.java         |  22 ++-
 .../cache/persistence/wal/FileInput.java        |  40 ++--
 .../wal/FileWriteAheadLogManager.java           | 161 ++++++++-------
 .../wal/reader/IgniteWalIteratorFactory.java    |  13 +-
 .../wal/reader/StandaloneGridKernalContext.java |  15 +-
 .../reader/StandaloneIgnitePluginProcessor.java |  38 ++++
 .../reader/StandaloneWalRecordsIterator.java    |  37 ++--
 ...gnitePdsRecoveryAfterFileCorruptionTest.java |  11 +-
 .../db/wal/IgniteWalFlushFailoverTest.java      | 195 +++++++++++++++++++
 .../db/wal/crc/IgniteDataIntegrityTests.java    |  10 +-
 .../db/wal/reader/IgniteWalReaderTest.java      |   9 +-
 .../db/wal/reader/MockWalIteratorFactory.java   |   8 +-
 .../ignite/testsuites/IgnitePdsTestSuite2.java  |   4 +
 21 files changed, 919 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
index b531f9d..4792483 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
@@ -16,6 +16,8 @@
  */
 package org.apache.ignite.configuration;
 
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
 import java.io.Serializable;
@@ -133,6 +135,9 @@ public class PersistentStoreConfiguration implements Serializable {
     /** Always write full pages. */
     private boolean alwaysWriteFullPages = DFLT_WAL_ALWAYS_WRITE_FULL_PAGES;
 
+    /** Factory to provide I/O interface for files */
+    private FileIOFactory fileIOFactory = new RandomAccessFileIOFactory();
+
     /**
      * Number of sub-intervals the whole {@link #setRateTimeInterval(long)} will be split into to calculate
      * rate-based metrics.
@@ -539,6 +544,25 @@ public class PersistentStoreConfiguration implements Serializable {
     }
 
     /**
+     * Factory to provide implementation of FileIO interface
+     * which is used for any file read/write operations
+     *
+     * @return File I/O factory
+     */
+    public FileIOFactory getFileIOFactory() {
+        return fileIOFactory;
+    }
+
+    /**
+     * @param fileIOFactory File I/O factory
+     */
+    public PersistentStoreConfiguration setFileIOFactory(FileIOFactory fileIOFactory) {
+        this.fileIOFactory = fileIOFactory;
+
+        return this;
+    }
+
+    /**
      * <b>Note:</b> setting this value with {@link WALMode#DEFAULT} may generate file size overhead for WAL segments in case
      * grid is used rarely.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java
new file mode 100644
index 0000000..1e81150
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.file;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Interface to perform file I/O operations.
+ */
+public interface FileIO extends AutoCloseable {
+    /**
+     * Returns current file position.
+     *
+     * @return  Current file position,
+     *          a non-negative integer counting the number of bytes
+     *          from the beginning of the file to the current position.
+     *
+     * @throws IOException If some I/O error occurs.
+     */
+    public long position() throws IOException;
+
+    /**
+     * Sets new current file position.
+     *
+     * @param  newPosition
+     *         The new position, a non-negative integer counting
+     *         the number of bytes from the beginning of the file.
+     *
+     * @throws IOException If some I/O error occurs.
+     */
+    public void position(long newPosition) throws IOException;
+
+    /**
+     * Reads a sequence of bytes from this file into the {@code destinationBuffer}.
+     *
+     * @param destinationBuffer Destination byte buffer.
+     *
+     * @return Number of read bytes.
+     *
+     * @throws IOException If some I/O error occurs.
+     */
+    public int read(ByteBuffer destinationBuffer) throws IOException;
+
+    /**
+     * Reads a sequence of bytes from this file into the {@code destinationBuffer}
+     * starting from specified file {@code position}.
+     *
+     * @param destinationBuffer Destination byte buffer.
+     * @param position Starting position of file.
+     *
+     * @return Number of read bytes.
+     *
+     * @throws IOException If some I/O error occurs.
+     */
+    public int read(ByteBuffer destinationBuffer, long position) throws IOException;
+
+    /**
+     * Reads a up to {@code length} bytes from this file into the {@code buffer}.
+     *
+     * @param buffer Destination byte array.
+     * @param offset The start offset in array {@code b}
+     *               at which the data is written.
+     * @param length Maximum number of bytes read.
+     *
+     * @return Number of read bytes.
+     *
+     * @throws IOException If some I/O error occurs.
+     */
+    public int read(byte[] buffer, int offset, int length) throws IOException;
+
+    /**
+     * Writes a sequence of bytes to this file from the {@code sourceBuffer}.
+     *
+     * @param sourceBuffer Source buffer.
+     *
+     * @return Number of written bytes.
+     *
+     * @throws IOException If some I/O error occurs.
+     */
+    public int write(ByteBuffer sourceBuffer) throws IOException;
+
+    /**
+     * Writes a sequence of bytes to this file from the {@code sourceBuffer}
+     * starting from specified file {@code position}
+     *
+     * @param sourceBuffer Source buffer.
+     * @param position Starting file position.
+     *
+     * @return Number of written bytes.
+     *
+     * @throws IOException If some I/O error occurs.
+     */
+    public int write(ByteBuffer sourceBuffer, long position) throws IOException;
+
+    /**
+     * Writes {@code length} bytes from the {@code buffer}
+     * starting at offset {@code off} to this file.
+     *
+     * @param buffer Source byte array.
+     * @param offset Start offset in the {@code buffer}.
+     * @param length Number of bytes to write.
+     *
+     * @throws IOException If some I/O error occurs.
+     */
+    public void write(byte[] buffer, int offset, int length) throws IOException;
+
+    /**
+     * Forces any updates of this file to be written to the storage
+     * device that contains it.
+     *
+     * @throws IOException If some I/O error occurs.
+     */
+    public void force() throws IOException;
+
+    /**
+     * Returns current file size in bytes.
+     *
+     * @return File size.
+     *
+     * @throws IOException If some I/O error occurs.
+     */
+    public long size() throws IOException;
+
+    /**
+     * Truncates current file to zero length
+     * and resets current file position to zero.
+     *
+     * @throws IOException If some I/O error occurs.
+     */
+    public void clear() throws IOException;
+
+    /**
+     * Closes current file.
+     *
+     * @throws IOException If some I/O error occurs.
+     */
+    @Override public void close() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java
new file mode 100644
index 0000000..3e80ef8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.file;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Decorator class for File I/O
+ */
+public class FileIODecorator implements FileIO {
+
+    /** File I/O delegate */
+    private final FileIO delegate;
+
+    /**
+     *
+     * @param delegate File I/O delegate
+     */
+    public FileIODecorator(FileIO delegate) {
+        this.delegate = delegate;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long position() throws IOException {
+        return delegate.position();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void position(long newPosition) throws IOException {
+        delegate.position(newPosition);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int read(ByteBuffer destinationBuffer) throws IOException {
+        return delegate.read(destinationBuffer);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int read(ByteBuffer destinationBuffer, long position) throws IOException {
+        return delegate.read(destinationBuffer, position);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int read(byte[] buffer, int offset, int length) throws IOException {
+        return delegate.read(buffer, offset, length);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int write(ByteBuffer sourceBuffer) throws IOException {
+        return delegate.write(sourceBuffer);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int write(ByteBuffer sourceBuffer, long position) throws IOException {
+        return delegate.write(sourceBuffer, position);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(byte[] buffer, int offset, int length) throws IOException {
+        delegate.write(buffer, offset, length);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void force() throws IOException {
+        delegate.force();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long size() throws IOException {
+        return delegate.size();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void clear() throws IOException {
+        delegate.clear();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws IOException {
+        delegate.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java
new file mode 100644
index 0000000..0ffc653
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+
+public interface FileIOFactory extends Serializable {
+
+    /**
+     * Creates I/O interface for file with default I/O mode
+     *
+     * @param file File
+     * @return File I/O interface
+     * @throws IOException If I/O interface creation was failed
+     */
+    FileIO create(File file) throws IOException;
+
+    /**
+     * Creates I/O interface for file with specified mode
+     *
+     * @param file File
+     * @param mode I/O mode in
+     * @return File I/O interface
+     * @throws IOException If I/O interface creation was failed
+     */
+    FileIO create(File file, String mode) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
index 6ddc9fc..c827e96 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
@@ -19,10 +19,8 @@ package org.apache.ignite.internal.processors.cache.persistence.file;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
-import java.nio.channels.FileChannel;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -61,11 +59,11 @@ public class FilePageStore implements PageStore {
     /** Database configuration. */
     private final MemoryConfiguration dbCfg;
 
-    /** */
-    private RandomAccessFile file;
+    /** Factory to provide I/O interfaces for read/write operations with files */
+    private final FileIOFactory ioFactory;
 
-    /** */
-    private FileChannel ch;
+    /** I/O interface for read/write operations with file */
+    private FileIO fileIO;
 
     /** */
     private final AtomicLong allocated;
@@ -91,11 +89,12 @@ public class FilePageStore implements PageStore {
     /**
      * @param file File.
      */
-    public FilePageStore(byte type, File file, MemoryConfiguration cfg) {
+    public FilePageStore(byte type, File file, FileIOFactory factory, MemoryConfiguration cfg) {
         this.type = type;
 
         cfgFile = file;
         dbCfg = cfg;
+        ioFactory = factory;
 
         allocated = new AtomicLong();
 
@@ -136,7 +135,7 @@ public class FilePageStore implements PageStore {
             ByteBuffer hdr = header(type, dbCfg.getPageSize());
 
             while (hdr.remaining() > 0)
-                ch.write(hdr);
+                fileIO.write(hdr);
         }
         catch (IOException e) {
             throw new IgniteException("Check file failed.", e);
@@ -154,7 +153,7 @@ public class FilePageStore implements PageStore {
             ByteBuffer hdr = ByteBuffer.allocate(HEADER_SIZE).order(ByteOrder.LITTLE_ENDIAN);
 
             while (hdr.remaining() > 0)
-                ch.read(hdr);
+                fileIO.read(hdr);
 
             hdr.rewind();
 
@@ -186,7 +185,7 @@ public class FilePageStore implements PageStore {
                     " [expectedPageSize=" + dbCfg.getPageSize() +
                     ", filePageSize=" + pageSize + "]");
 
-            long fileSize = file.length();
+            long fileSize = cfgFile.length();
 
             if (fileSize == HEADER_SIZE) // Every file has a special meta page.
                 fileSize = pageSize + HEADER_SIZE;
@@ -214,9 +213,9 @@ public class FilePageStore implements PageStore {
             if (!inited)
                 return;
 
-            ch.force(false);
+            fileIO.force();
 
-            file.close();
+            fileIO.close();
 
             if (cleanFile)
                 cfgFile.delete();
@@ -241,9 +240,7 @@ public class FilePageStore implements PageStore {
 
             this.tag = tag;
 
-            ch.position(0);
-
-            file.setLength(0);
+            fileIO.clear();
 
             allocated.set(initFile());
         }
@@ -277,7 +274,7 @@ public class FilePageStore implements PageStore {
 
         try {
             if (inited)
-                allocated.set(ch.size());
+                allocated.set(fileIO.size());
 
             recover = false;
         }
@@ -303,7 +300,7 @@ public class FilePageStore implements PageStore {
             int len = pageSize;
 
             do {
-                int n = ch.read(pageBuf, off);
+                int n = fileIO.read(pageBuf, off);
 
                 // If page was not written yet, nothing to read.
                 if (n < 0) {
@@ -330,7 +327,7 @@ public class FilePageStore implements PageStore {
                 if ((savedCrc32 ^ curCrc32) != 0)
                     throw new IgniteDataIntegrityViolationException("Failed to read page (CRC validation failed) " +
                         "[id=" + U.hexLong(pageId) + ", off=" + (off - pageSize) +
-                        ", file=" + cfgFile.getAbsolutePath() + ", fileSize=" + ch.size() +
+                        ", file=" + cfgFile.getAbsolutePath() + ", fileSize=" + fileIO.size() +
                         ", savedCrc=" + U.hexInt(savedCrc32) + ", curCrc=" + U.hexInt(curCrc32) + "]");
             }
 
@@ -356,7 +353,7 @@ public class FilePageStore implements PageStore {
             long off = 0;
 
             do {
-                int n = ch.read(buf, off);
+                int n = fileIO.read(buf, off);
 
                 // If page was not written yet, nothing to read.
                 if (n < 0)
@@ -382,16 +379,14 @@ public class FilePageStore implements PageStore {
 
             try {
                 if (!inited) {
-                    RandomAccessFile rndFile = null;
+                    FileIO fileIO = null;
 
                     IgniteCheckedException err = null;
 
                     try {
-                        file = rndFile = new RandomAccessFile(cfgFile, "rw");
-
-                        ch = file.getChannel();
+                        this.fileIO = fileIO = ioFactory.create(cfgFile, "rw");
 
-                        if (file.length() == 0)
+                        if (cfgFile.length() == 0)
                             allocated.set(initFile());
                         else
                             allocated.set(checkFile());
@@ -402,9 +397,9 @@ public class FilePageStore implements PageStore {
                         throw err = new IgniteCheckedException("Can't open file: " + cfgFile.getName(), e);
                     }
                     finally {
-                        if (err != null && rndFile != null)
+                        if (err != null && fileIO != null)
                             try {
-                                rndFile.close();
+                                fileIO.close();
                             }
                             catch (IOException e) {
                                 err.addSuppressed(e);
@@ -447,7 +442,7 @@ public class FilePageStore implements PageStore {
             int len = pageSize;
 
             do {
-                int n = ch.write(pageBuf, off);
+                int n = fileIO.write(pageBuf, off);
 
                 off += n;
 
@@ -478,7 +473,7 @@ public class FilePageStore implements PageStore {
         try {
             init();
 
-            ch.force(false);
+            fileIO.force();
         }
         catch (IOException e) {
             throw new IgniteCheckedException("Sync error", e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
index 6aa2243..4a56ec7 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -367,6 +367,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
         FilePageStore idxStore = new FilePageStore(
             PageMemory.FLAG_IDX,
             idxFile,
+            pstCfg.getFileIOFactory(),
             cctx.kernalContext().config().getMemoryConfiguration());
 
         FilePageStore[] partStores = new FilePageStore[grpDesc.config().getAffinity().partitions()];
@@ -375,6 +376,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
             FilePageStore partStore = new FilePageStore(
                 PageMemory.FLAG_DATA,
                 new File(cacheWorkDir, String.format(PART_FILE_TEMPLATE, partId)),
+                pstCfg.getFileIOFactory(),
                 cctx.kernalContext().config().getMemoryConfiguration()
             );
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
new file mode 100644
index 0000000..73a560a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.file;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+/**
+ * File I/O implementation based on {@code java.io.RandomAccessFile}.
+ */
+public class RandomAccessFileIO implements FileIO {
+
+    /**
+     * Random access file associated with this I/O
+     */
+    private final RandomAccessFile file;
+
+    /**
+     * File channel associated with {@code file}
+     */
+    private final FileChannel channel;
+
+    /**
+     * Creates I/O implementation for specified {@code file}
+     *
+     * @param file Random access file
+     */
+    public RandomAccessFileIO(RandomAccessFile file) {
+        this.file = file;
+        this.channel = file.getChannel();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long position() throws IOException {
+        return channel.position();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void position(long newPosition) throws IOException {
+        channel.position(newPosition);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int read(ByteBuffer destinationBuffer) throws IOException {
+        return channel.read(destinationBuffer);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int read(ByteBuffer destinationBuffer, long position) throws IOException {
+        return channel.read(destinationBuffer, position);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int read(byte[] buffer, int offset, int length) throws IOException {
+        return file.read(buffer, offset, length);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int write(ByteBuffer sourceBuffer) throws IOException {
+        return channel.write(sourceBuffer);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int write(ByteBuffer sourceBuffer, long position) throws IOException {
+        return channel.write(sourceBuffer, position);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(byte[] buffer, int offset, int length) throws IOException {
+        file.write(buffer, offset, length);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void force() throws IOException {
+        channel.force(false);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long size() throws IOException {
+        return channel.size();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void clear() throws IOException {
+        channel.position(0);
+        file.setLength(0);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws IOException {
+        file.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIOFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIOFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIOFactory.java
new file mode 100644
index 0000000..6b731f2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIOFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+/**
+ * File I/O factory which provides RandomAccessFileIO implementation of FileIO.
+ */
+public class RandomAccessFileIOFactory implements FileIOFactory {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** {@inheritDoc} */
+    @Override public FileIO create(File file) throws IOException {
+        return create(file, "rw");
+    }
+
+    /** {@inheritDoc} */
+    @Override public FileIO create(File file, String mode) throws IOException {
+        RandomAccessFile rf = new RandomAccessFile(file, mode);
+
+        return new RandomAccessFileIO(rf);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
index f4bace1..beed90b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
@@ -21,15 +21,15 @@ import java.io.EOFException;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.RandomAccessFile;
 import java.nio.ByteOrder;
-import java.nio.channels.FileChannel;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.pagemem.wal.WALIterator;
 import org.apache.ignite.internal.pagemem.wal.WALPointer;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
 import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord;
 import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
 import org.apache.ignite.lang.IgniteBiTuple;
@@ -71,6 +71,9 @@ public abstract class AbstractWalRecordsIterator
     /** Serializer of current version to read headers. */
     @NotNull private final RecordSerializer serializer;
 
+    /** Factory to provide I/O interfaces for read/write operations with files */
+    @NotNull protected final FileIOFactory ioFactory;
+
     /** Utility buffer for reading records */
     private final ByteBufferExpander buf;
 
@@ -84,11 +87,13 @@ public abstract class AbstractWalRecordsIterator
         @NotNull final IgniteLogger log,
         @NotNull final GridCacheSharedContext sharedCtx,
         @NotNull final RecordSerializer serializer,
+        @NotNull final FileIOFactory ioFactory,
         final int bufSize
     ) {
         this.log = log;
         this.sharedCtx = sharedCtx;
         this.serializer = serializer;
+        this.ioFactory = ioFactory;
 
         // Do not allocate direct buffer for iterator.
         buf = new ByteBufferExpander(bufSize, ByteOrder.nativeOrder());
@@ -229,15 +234,14 @@ public abstract class AbstractWalRecordsIterator
         @Nullable final FileWALPointer start)
         throws IgniteCheckedException, FileNotFoundException {
         try {
-            RandomAccessFile rf = new RandomAccessFile(desc.file, "r");
+            FileIO fileIO = ioFactory.create(desc.file, "r");
 
             try {
-                FileChannel ch = rf.getChannel();
-                FileInput in = new FileInput(ch, buf);
+                FileInput in = new FileInput(fileIO, buf);
 
                 // Header record must be agnostic to the serializer version.
                 WALRecord rec = serializer.readRecord(in,
-                    new FileWALPointer(desc.idx, (int)ch.position(), 0));
+                    new FileWALPointer(desc.idx, (int)fileIO.position(), 0));
 
                 if (rec == null)
                     return null;
@@ -252,11 +256,11 @@ public abstract class AbstractWalRecordsIterator
                 if (start != null && desc.idx == start.index())
                     in.seek(start.fileOffset());
 
-                return new FileWriteAheadLogManager.ReadFileHandle(rf, desc.idx, sharedCtx.igniteInstanceName(), ser, in);
+                return new FileWriteAheadLogManager.ReadFileHandle(fileIO, desc.idx, sharedCtx.igniteInstanceName(), ser, in);
             }
             catch (SegmentEofException | EOFException ignore) {
                 try {
-                    rf.close();
+                    fileIO.close();
                 }
                 catch (IOException ce) {
                     throw new IgniteCheckedException(ce);
@@ -266,7 +270,7 @@ public abstract class AbstractWalRecordsIterator
             }
             catch (IOException | IgniteCheckedException e) {
                 try {
-                    rf.close();
+                    fileIO.close();
                 }
                 catch (IOException ce) {
                     e.addSuppressed(ce);

http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
index 00c7c02..6443a7c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.processors.cache.persistence.wal;
 import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
 import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32;
 import org.jetbrains.annotations.NotNull;
@@ -36,8 +36,8 @@ public final class FileInput implements ByteBufferBackedDataInput {
      */
     private ByteBuffer buf;
 
-    /** File channel to read chunks from */
-    private FileChannel ch;
+    /** I/O interface for read/write operations with file */
+    private FileIO io;
 
     /** */
     private long pos;
@@ -46,28 +46,20 @@ public final class FileInput implements ByteBufferBackedDataInput {
     private ByteBufferExpander expBuf;
 
     /**
-     * @param ch  Channel to read from
-     * @param buf Buffer for reading blocks of data into
+     * @param io FileIO to read from.
+     * @param buf Buffer for reading blocks of data into.
      */
-    public FileInput(FileChannel ch, ByteBuffer buf) throws IOException {
-        assert ch != null;
+    public FileInput(FileIO io, ByteBufferExpander buf) throws IOException {
+        assert io != null;
 
-        this.ch = ch;
-        this.buf = buf;
+        this.io = io;
+        this.buf = buf.buffer();
 
-        pos = ch.position();
+        expBuf = buf;
 
-        clearBuffer();
-    }
-
-    /**
-     * @param ch Channel to read from
-     * @param expBuf ByteBufferWrapper with ability expand buffer dynamically.
-     */
-    public FileInput(FileChannel ch, ByteBufferExpander expBuf) throws IOException {
-        this(ch, expBuf.buffer());
+        pos = io.position();
 
-        this.expBuf = expBuf;
+        clearBuffer();
     }
 
     /**
@@ -84,10 +76,10 @@ public final class FileInput implements ByteBufferBackedDataInput {
      * @param pos Position in bytes from file begin.
      */
     public void seek(long pos) throws IOException {
-        if (pos > ch.size())
+        if (pos > io.size())
             throw new EOFException();
 
-        ch.position(pos);
+        io.position(pos);
 
         this.pos = pos;
 
@@ -118,10 +110,10 @@ public final class FileInput implements ByteBufferBackedDataInput {
         buf.compact();
 
         do {
-            int read = ch.read(buf);
+            int read = io.read(buf);
 
             if (read == -1)
-                throw new EOFException("EOF at position [" + ch.position() + "] expected to read [" + requested + "] bytes");
+                throw new EOFException("EOF at position [" + io.position() + "] expected to read [" + requested + "] bytes");
 
             available += read;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 162f43d..5c112fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -22,10 +22,8 @@ import java.io.File;
 import java.io.FileFilter;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
-import java.nio.channels.FileChannel;
 import java.nio.file.Files;
 import java.sql.Time;
 import java.util.Arrays;
@@ -48,6 +46,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.PersistentStoreConfiguration;
 import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.events.EventType;
+import org.apache.ignite.events.WalSegmentArchivedEvent;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
@@ -61,7 +60,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
 import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.PersistenceMetricsImpl;
-import org.apache.ignite.events.WalSegmentArchivedEvent;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
 import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord;
 import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
@@ -153,6 +153,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     /** */
     private volatile long oldestArchiveSegmentIdx;
 
+    /** Factory to provide I/O interfaces for read/write operations with files */
+    private final FileIOFactory ioFactory;
+
     /** Updater for {@link #currentHnd}, used for verify there are no concurrent update for current log segment handle */
     private static final AtomicReferenceFieldUpdater<FileWriteAheadLogManager, FileWriteHandle> currentHndUpd =
         AtomicReferenceFieldUpdater.newUpdater(FileWriteAheadLogManager.class, FileWriteHandle.class, "currentHnd");
@@ -181,6 +184,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     /** Current log segment handle */
     private volatile FileWriteHandle currentHnd;
 
+    /** Environment failure. */
+    private volatile Throwable envFailed;
+
     /**
      * Positive (non-0) value indicates WAL can be archived even if not complete<br>
      * See {@link PersistentStoreConfiguration#setWalAutoArchiveAfterInactivity(long)}<br>
@@ -225,6 +231,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         flushFreq = psCfg.getWalFlushFrequency();
         fsyncDelay = psCfg.getWalFsyncDelay();
         alwaysWriteFullPages = psCfg.isAlwaysWriteFullPages();
+        ioFactory = psCfg.getFileIOFactory();
         walAutoArchiveAfterInactivity = psCfg.getWalAutoArchiveAfterInactivity();
         evt = ctx.event();
     }
@@ -322,7 +329,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                 archiver.shutdown();
         }
         catch (Exception e) {
-            U.error(log, "Failed to gracefully close WAL segment: " + currHnd.file, e);
+            U.error(log, "Failed to gracefully close WAL segment: " + currentHnd.fileIO, e);
         }
     }
 
@@ -493,6 +500,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                 return ptr;
             }
 
+            checkEnvironment();
+
             if (isStopping())
                 throw new IgniteCheckedException("Stopping.");
         }
@@ -549,6 +558,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             end,
             psCfg,
             serializer,
+            ioFactory,
             archiver,
             log,
             tlbSize
@@ -800,13 +810,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         int len = lastReadPtr == null ? 0 : lastReadPtr.length();
 
         try {
-            RandomAccessFile file = new RandomAccessFile(curFile, "rw");
+            FileIO fileIO = ioFactory.create(curFile);
 
             try {
                 // readSerializerVersion will change the channel position.
                 // This is fine because the FileWriteHandle consitructor will move it
                 // to offset + len anyways.
-                int serVer = readSerializerVersion(file, curFile, absIdx);
+                int serVer = readSerializerVersion(fileIO, curFile, absIdx);
 
                 RecordSerializer ser = forVersion(cctx, serVer);
 
@@ -815,7 +825,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                         ", offset=" + offset + ", ver=" + serVer + ']');
 
                 FileWriteHandle hnd = new FileWriteHandle(
-                    file,
+                    fileIO,
                     absIdx,
                     cctx.igniteInstanceName(),
                     offset + len,
@@ -835,7 +845,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                 return hnd;
             }
             catch (IgniteCheckedException | IOException e) {
-                file.close();
+                fileIO.close();
 
                 throw e;
             }
@@ -862,10 +872,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             if (log.isDebugEnabled())
                 log.debug("Switching to a new WAL segment: " + nextFile.getAbsolutePath());
 
-            RandomAccessFile file = new RandomAccessFile(nextFile, "rw");
+            FileIO fileIO = ioFactory.create(nextFile);
 
             FileWriteHandle hnd = new FileWriteHandle(
-                file,
+                fileIO,
                 curIdx + 1,
                 cctx.igniteInstanceName(),
                 0,
@@ -929,22 +939,22 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         if (log.isDebugEnabled())
             log.debug("Formatting file [exists=" + file.exists() + ", file=" + file.getAbsolutePath() + ']');
 
-        try (RandomAccessFile rnd = new RandomAccessFile(file, "rw")) {
+        try (FileIO fileIO = ioFactory.create(file, "rw")) {
             int left = psCfg.getWalSegmentSize();
 
             if (mode == WALMode.DEFAULT) {
                 while (left > 0) {
                     int toWrite = Math.min(FILL_BUF.length, left);
 
-                    rnd.write(FILL_BUF, 0, toWrite);
+                    fileIO.write(FILL_BUF, 0, toWrite);
 
                     left -= toWrite;
                 }
 
-                rnd.getChannel().force(false);
+                fileIO.force();
             }
             else
-                rnd.setLength(0);
+                fileIO.clear();
         }
         catch (IOException e) {
             throw new IgniteCheckedException("Failed to format WAL segment file: " + file.getAbsolutePath(), e);
@@ -1033,6 +1043,15 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     }
 
     /**
+     * @throws StorageException If environment is no longer valid and we missed a WAL write.
+     */
+    private void checkEnvironment() throws StorageException {
+        if (envFailed != null)
+            throw new StorageException("Failed to flush WAL buffer (environment was invalidated by a " +
+                    "previous error)", envFailed);
+    }
+
+    /**
      * File archiver operates on absolute segment indexes. For any given absolute segment index N we can calculate
      * the work WAL segment: S(N) = N % psCfg.walSegments.
      * When a work segment is finished, it is given to the archiver. If the absolute index of last archived segment
@@ -1337,8 +1356,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                 Files.move(dstTmpFile.toPath(), dstFile.toPath());
 
                 if (mode == WALMode.DEFAULT) {
-                    try (RandomAccessFile f0 = new RandomAccessFile(dstFile, "rw")) {
-                        f0.getChannel().force(false);
+                    try (FileIO f0 = ioFactory.create(dstFile, "rw")) {
+                        f0.force();
                     }
                 }
             }
@@ -1402,20 +1421,21 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     }
 
     /**
-     * @param rf Random access file.
+     * @param io I/O interface for file.
      * @param file File object.
      * @param idx File index to read.
      * @return Serializer version stored in the file.
      * @throws IOException If failed to read serializer version.
      * @throws IgniteCheckedException If failed to read serializer version.
      */
-    private int readSerializerVersion(RandomAccessFile rf, File file, long idx)
+    private int readSerializerVersion(FileIO io, File file, long idx)
         throws IOException, IgniteCheckedException {
         try {
             ByteBuffer buf = ByteBuffer.allocate(RecordV1Serializer.HEADER_RECORD_SIZE);
             buf.order(ByteOrder.nativeOrder());
 
-            FileInput in = new FileInput(rf.getChannel(), buf);
+            FileInput in = new FileInput(io,
+                new ByteBufferExpander(RecordV1Serializer.HEADER_RECORD_SIZE, ByteOrder.nativeOrder()));
 
             // Header record must be agnostic to the serializer version.
             WALRecord rec = serializer.readRecord(in, new FileWALPointer(idx, 0, 0));
@@ -1541,11 +1561,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
      *
      */
     private abstract static class FileHandle {
-        /** */
-        protected RandomAccessFile file;
-
-        /** */
-        protected FileChannel ch;
+        /** I/O interface for read/write operations with file */
+        protected FileIO fileIO;
 
         /** Absolute WAL segment file index (incremental counter) */
         protected final long idx;
@@ -1554,17 +1571,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         protected String gridName;
 
         /**
-         * @param file File.
+         * @param fileIO I/O interface for read/write operations of FileHandle.
          * @param idx Absolute WAL segment file index (incremental counter).
          */
-        private FileHandle(RandomAccessFile file, long idx, String gridName) {
-            this.file = file;
+        private FileHandle(FileIO fileIO, long idx, String gridName) {
+            this.fileIO = fileIO;
             this.idx = idx;
             this.gridName = gridName;
-
-            ch = file.getChannel();
-
-            assert ch != null;
         }
     }
 
@@ -1585,19 +1598,19 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         private boolean workDir;
 
         /**
-         * @param file File to read.
+         * @param fileIO I/O interface for read/write operations of FileHandle.
          * @param idx Absolute WAL segment file index (incremental counter).
          * @param ser Entry serializer.
          * @param in File input.
          */
         ReadFileHandle(
-            RandomAccessFile file,
-            long idx,
-            String gridName,
-            RecordSerializer ser,
-            FileInput in
+                FileIO fileIO,
+                long idx,
+                String gridName,
+                RecordSerializer ser,
+                FileInput in
         ) {
-            super(file, idx, gridName);
+            super(fileIO, idx, gridName);
 
             this.ser = ser;
             this.in = in;
@@ -1608,7 +1621,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          */
         public void close() throws IgniteCheckedException {
             try {
-                file.close();
+                fileIO.close();
             }
             catch (IOException e) {
                 throw new IgniteCheckedException(e);
@@ -1644,10 +1657,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         /** */
         private volatile long lastFsyncPos;
 
-        /** Environment failure. */
-        private volatile Throwable envFailed;
-
-        /** Stop guard to provide warranty that only one thread will be successful in calling {@link #close(boolean)} */
+        /** Stop guard to provide warranty that only one thread will be successful in calling {@link #close(boolean)}*/
         private final AtomicBoolean stop = new AtomicBoolean(false);
 
         /** */
@@ -1661,12 +1671,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
         /**
          * Next segment available condition.
-         * Protection from "spurious wakeup" is provided by predicate {@link #ch}=<code>null</code>
+         * Protection from "spurious wakeup" is provided by predicate {@link #fileIO}=<code>null</code>
          */
         private final Condition nextSegment = lock.newCondition();
 
         /**
-         * @param file Mapped file to use.
+         * @param fileIO I/O file interface to use
          * @param idx Absolute WAL segment file index for easy access.
          * @param pos Position.
          * @param maxSegmentSize Max segment size.
@@ -1674,18 +1684,18 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          * @throws IOException If failed.
          */
         private FileWriteHandle(
-            RandomAccessFile file,
+            FileIO fileIO,
             long idx,
             String gridName,
             long pos,
             long maxSegmentSize,
             RecordSerializer serializer
         ) throws IOException {
-            super(file, idx, gridName);
+            super(fileIO, idx, gridName);
 
             assert serializer != null;
 
-            ch.position(pos);
+            fileIO.position(pos);
 
             this.maxSegmentSize = maxSegmentSize;
             this.serializer = serializer;
@@ -1887,6 +1897,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             catch (Throwable e) {
                 invalidateEnvironment(e);
 
+                // All workers waiting for a next segment must be woken up and stopped
+                signalNextAvailable();
+
                 throw e;
             }
         }
@@ -1990,7 +2003,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                     long start = metricsEnabled ? System.nanoTime() : 0;
 
                     try {
-                        ch.force(false);
+                        fileIO.force();
                     }
                     catch (IOException e) {
                         throw new StorageException(e);
@@ -2027,20 +2040,24 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
                 try {
                     int switchSegmentRecSize = RecordV1Serializer.REC_TYPE_SIZE + RecordV1Serializer.FILE_WAL_POINTER_SIZE;
+
                     if (rollOver && written < (maxSegmentSize - switchSegmentRecSize)) {
                         //it is expected there is sufficient space for this record because rollover should run early
                         final ByteBuffer buf = ByteBuffer.allocate(switchSegmentRecSize);
                         buf.put((byte)(WALRecord.RecordType.SWITCH_SEGMENT_RECORD.ordinal() + 1));
-                        final FileWALPointer pointer = new FileWALPointer(idx, (int)ch.position(), -1);
+
+                        final FileWALPointer pointer = new FileWALPointer(idx, (int)fileIO.position(), -1);
                         RecordV1Serializer.putPosition(buf, pointer);
+
                         buf.rewind();
-                        ch.write(buf, written);
+
+                        fileIO.write(buf, written);
 
                         if (mode == WALMode.DEFAULT)
-                            ch.force(false);
+                            fileIO.force();
                     }
 
-                    ch.close();
+                    fileIO.close();
                 }
                 catch (IOException e) {
                     throw new IgniteCheckedException(e);
@@ -2064,13 +2081,15 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             try {
                 WALRecord rec = head.get();
 
-                assert rec instanceof FakeRecord : "Expected head FakeRecord, actual head "
+                if (envFailed == null) {
+                    assert rec instanceof FakeRecord : "Expected head FakeRecord, actual head "
                     + (rec != null ? rec.getClass().getSimpleName() : "null");
 
-                assert written == lastFsyncPos || mode != WALMode.DEFAULT :
+                    assert written == lastFsyncPos || mode != WALMode.DEFAULT :
                     "fsync [written=" + written + ", lastFsync=" + lastFsyncPos + ']';
+                }
 
-                ch = null;
+                fileIO = null;
 
                 nextSegment.signalAll();
             }
@@ -2086,7 +2105,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             lock.lock();
 
             try {
-                while (ch != null)
+                while (fileIO != null)
                     U.await(nextSegment);
             }
             finally {
@@ -2108,7 +2127,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             lock.lock();
 
             try {
-                assert ch != null : "Writing to a closed segment.";
+                assert fileIO != null : "Writing to a closed segment.";
 
                 checkEnvironment();
 
@@ -2151,10 +2170,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                 assert size > 0 : size;
 
                 try {
-                    assert written == ch.position();
+                    assert written == fileIO.position();
 
                     do {
-                        ch.write(buf);
+                        fileIO.write(buf);
                     }
                     while (buf.hasRemaining());
 
@@ -2162,7 +2181,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
                     metrics.onWalBytesWritten(size);
 
-                    assert written == ch.position();
+                    assert written == fileIO.position();
                 }
                 catch (IOException e) {
                     invalidateEnvironmentLocked(e);
@@ -2215,25 +2234,16 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         }
 
         /**
-         * @throws StorageException If environment is no longer valid and we missed a WAL write.
-         */
-        private void checkEnvironment() throws StorageException {
-            if (envFailed != null)
-                throw new StorageException("Failed to flush WAL buffer (environment was invalidated by a " +
-                    "previous error)", envFailed);
-        }
-
-        /**
          * @return Safely reads current position of the file channel as String. Will return "null" if channel is null.
          */
         private String safePosition() {
-            FileChannel ch = this.ch;
+            FileIO io = this.fileIO;
 
-            if (ch == null)
+            if (io == null)
                 return "null";
 
             try {
-                return String.valueOf(ch.position());
+                return String.valueOf(io.position());
             }
             catch (IOException e) {
                 return "{Failed to read channel position: " + e.getMessage() + "}";
@@ -2320,6 +2330,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             @Nullable FileWALPointer end,
             PersistentStoreConfiguration psCfg,
             @NotNull RecordSerializer serializer,
+            FileIOFactory ioFactory,
             FileArchiver archiver,
             IgniteLogger log,
             int tlbSize
@@ -2327,6 +2338,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             super(log,
                 cctx,
                 serializer,
+                ioFactory,
                 psCfg.getWalRecordIteratorBufferSize());
             this.walWorkDir = walWorkDir;
             this.walArchiveDir = walArchiveDir;
@@ -2479,11 +2491,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
      */
     private void doFlush() {
         final FileWriteHandle hnd = currentHandle();
-
         try {
             hnd.flush(hnd.head.get());
         }
-        catch (IgniteCheckedException e) {
+        catch (Exception e) {
             U.warn(log, "Failed to flush WAL record queue", e);
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
index 8ea0585..4e3998b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
@@ -24,6 +24,7 @@ import org.apache.ignite.configuration.MemoryConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.pagemem.wal.WALIterator;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
 import org.jetbrains.annotations.NotNull;
 
 /**
@@ -34,15 +35,18 @@ public class IgniteWalIteratorFactory {
     private final IgniteLogger log;
     /** Page size, in standalone iterator mode this value can't be taken from memory configuration */
     private final int pageSize;
+    /** Factory to provide I/O interfaces for read/write operations with files */
+    private final FileIOFactory ioFactory;
 
     /**
      * Creates WAL files iterator factory
      * @param log Logger.
      * @param pageSize Page size, size is validated
      */
-    public IgniteWalIteratorFactory(@NotNull final IgniteLogger log, final int pageSize) {
+    public IgniteWalIteratorFactory(@NotNull IgniteLogger log, @NotNull FileIOFactory ioFactory, int pageSize) {
         this.log = log;
         this.pageSize = pageSize;
+        this.ioFactory = ioFactory;
         new MemoryConfiguration().setPageSize(pageSize); // just for validate
     }
 
@@ -57,7 +61,7 @@ public class IgniteWalIteratorFactory {
      * @throws IgniteCheckedException if failed to read folder
      */
     public WALIterator iteratorArchiveDirectory(@NotNull final File walDirWithConsistentId) throws IgniteCheckedException {
-        return new StandaloneWalRecordsIterator(walDirWithConsistentId, log, prepareSharedCtx());
+        return new StandaloneWalRecordsIterator(walDirWithConsistentId, log, prepareSharedCtx(), ioFactory);
     }
 
     /**
@@ -69,7 +73,7 @@ public class IgniteWalIteratorFactory {
      * @throws IgniteCheckedException if failed to read files
      */
     public WALIterator iteratorArchiveFiles(@NotNull final File ...files) throws IgniteCheckedException {
-        return new StandaloneWalRecordsIterator(log, prepareSharedCtx(), false, files);
+        return new StandaloneWalRecordsIterator(log, prepareSharedCtx(), ioFactory, false, files);
     }
 
     /**
@@ -81,7 +85,7 @@ public class IgniteWalIteratorFactory {
      * @throws IgniteCheckedException if failed to read files
      */
     public WALIterator iteratorWorkFiles(@NotNull final File ...files) throws IgniteCheckedException {
-        return new StandaloneWalRecordsIterator(log, prepareSharedCtx(), true, files);
+        return new StandaloneWalRecordsIterator(log, prepareSharedCtx(), ioFactory, true, files);
     }
 
     /**
@@ -93,6 +97,7 @@ public class IgniteWalIteratorFactory {
         final StandaloneIgniteCacheDatabaseSharedManager dbMgr = new StandaloneIgniteCacheDatabaseSharedManager();
 
         dbMgr.setPageSize(pageSize);
+
         return new GridCacheSharedContext<>(
             kernalCtx, null, null, null,
             null, null, dbMgr, null,

http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
index df932e6..02b9352 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.GridComponent;
@@ -82,13 +83,25 @@ import org.jetbrains.annotations.Nullable;
  * Dummy grid kernal context
  */
 public class StandaloneGridKernalContext implements GridKernalContext {
+    /** */
     private IgniteLogger log;
 
+    /** */
+    private IgnitePluginProcessor pluginProc;
+
     /**
      * @param log Logger.
      */
     StandaloneGridKernalContext(IgniteLogger log) {
         this.log = log;
+
+        try {
+            pluginProc = new StandaloneIgnitePluginProcessor(
+                this, config());
+        }
+        catch (IgniteCheckedException e) {
+            throw new IllegalStateException("Must not fail on empty providers list.", e);
+        }
     }
 
     /** {@inheritDoc} */
@@ -278,7 +291,7 @@ public class StandaloneGridKernalContext implements GridKernalContext {
 
     /** {@inheritDoc} */
     @Override public IgnitePluginProcessor plugins() {
-        return null;
+        return pluginProc;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneIgnitePluginProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneIgnitePluginProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneIgnitePluginProcessor.java
new file mode 100644
index 0000000..838fc85
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneIgnitePluginProcessor.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.reader;
+
+import java.util.Collections;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor;
+import org.apache.ignite.plugin.PluginProvider;
+
+/**
+ *
+ */
+public class StandaloneIgnitePluginProcessor extends IgnitePluginProcessor {
+    /**
+     * @param ctx Kernal context.
+     * @param cfg Ignite configuration.
+     */
+    public StandaloneIgnitePluginProcessor(GridKernalContext ctx, IgniteConfiguration cfg) throws IgniteCheckedException {
+        super(ctx, cfg, Collections.<PluginProvider>emptyList());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
index f17c112..ecad70a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
@@ -21,10 +21,7 @@ import java.io.DataInput;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
-import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -33,7 +30,10 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
 import org.apache.ignite.internal.processors.cache.persistence.wal.AbstractWalRecordsIterator;
+import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExpander;
 import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput;
 import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
 import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
@@ -83,14 +83,17 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
      * @param walFilesDir Wal files directory. Should already contain node consistent ID as subfolder
      * @param log Logger.
      * @param sharedCtx Shared context.
+     * @param ioFactory File I/O factory.
      */
     StandaloneWalRecordsIterator(
-        @NotNull final File walFilesDir,
-        @NotNull final IgniteLogger log,
-        @NotNull final GridCacheSharedContext sharedCtx) throws IgniteCheckedException {
+        @NotNull File walFilesDir,
+        @NotNull IgniteLogger log,
+        @NotNull GridCacheSharedContext sharedCtx,
+        @NotNull FileIOFactory ioFactory) throws IgniteCheckedException {
         super(log,
             sharedCtx,
             new RecordV1Serializer(sharedCtx),
+            ioFactory,
             BUF_SIZE);
         init(walFilesDir, false, null);
         advance();
@@ -101,17 +104,20 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
      *
      * @param log Logger.
      * @param sharedCtx Shared context.
+     * @param ioFactory File I/O factory.
      * @param workDir Work directory is scanned, false - archive
      * @param walFiles Wal files.
      */
     StandaloneWalRecordsIterator(
-        @NotNull final IgniteLogger log,
-        @NotNull final GridCacheSharedContext sharedCtx,
-        final boolean workDir,
-        @NotNull final File... walFiles) throws IgniteCheckedException {
+            @NotNull IgniteLogger log,
+            @NotNull GridCacheSharedContext sharedCtx,
+            @NotNull FileIOFactory ioFactory,
+            boolean workDir,
+            @NotNull File... walFiles) throws IgniteCheckedException {
         super(log,
             sharedCtx,
             new RecordV1Serializer(sharedCtx),
+            ioFactory,
             BUF_SIZE);
         this.workDir = workDir;
         init(null, workDir, walFiles);
@@ -138,10 +144,12 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
         }
         else {
             this.workDir = workDir;
+
             if (workDir)
                 walFileDescriptors = scanIndexesFromFileHeaders(walFiles);
             else
                 walFileDescriptors = new ArrayList<>(Arrays.asList(FileWriteAheadLogManager.scan(walFiles)));
+
             curWalSegmIdx = !walFileDescriptors.isEmpty() ? walFileDescriptors.get(0).getIdx() : 0;
         }
         curWalSegmIdx--;
@@ -172,13 +180,10 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
 
             FileWALPointer ptr;
 
-            try (RandomAccessFile rf = new RandomAccessFile(file, "r");) {
-                final FileChannel ch = rf.getChannel();
-                final ByteBuffer buf = ByteBuffer.allocate(HEADER_RECORD_SIZE);
-
-                buf.order(ByteOrder.nativeOrder());
+            try (FileIO fileIO = ioFactory.create(file, "r")) {
+                final DataInput in = new FileInput(fileIO,
+                    new ByteBufferExpander(HEADER_RECORD_SIZE, ByteOrder.nativeOrder()));
 
-                final DataInput in = new FileInput(ch, buf);
                 // Header record must be agnostic to the serializer version.
                 final int type = in.readUnsignedByte();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java
index 6847482..e086258 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.persistence;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
-import java.nio.channels.FileChannel;
 import java.util.Collection;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
@@ -43,7 +42,7 @@ import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
 import org.apache.ignite.internal.pagemem.wal.WALPointer;
 import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
 import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
 import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl;
@@ -191,13 +190,13 @@ public class IgnitePdsRecoveryAfterFileCorruptionTest extends GridCommonAbstract
 
         FilePageStore filePageStore = (FilePageStore)store;
 
-        FileChannel ch = U.field(filePageStore, "ch");
+        FileIO fileIO = U.field(filePageStore, "fileIO");
 
-        long size = ch.size();
+        long size = fileIO.size();
 
-        ch.write(ByteBuffer.allocate((int)size - FilePageStore.HEADER_SIZE), FilePageStore.HEADER_SIZE);
+        fileIO.write(ByteBuffer.allocate((int)size - FilePageStore.HEADER_SIZE), FilePageStore.HEADER_SIZE);
 
-        ch.force(false);
+        fileIO.force();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
new file mode 100644
index 0000000..cad10ae
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db.wal;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.configuration.MemoryPolicyConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.GridKernalState;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+/**
+ *
+ */
+public class IgniteWalFlushFailoverTest extends GridCommonAbstractTest {
+
+    /** */
+    private static final String TEST_CACHE = "testCache";
+
+    /** */
+    private boolean flushByTimeout;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        deleteWorkFiles();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        deleteWorkFiles();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 30_000L;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration cacheCfg = new CacheConfiguration(TEST_CACHE)
+                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+
+        cfg.setCacheConfiguration(cacheCfg);
+
+        MemoryPolicyConfiguration memPlcCfg = new MemoryPolicyConfiguration()
+                .setName("dfltMemPlc")
+                .setInitialSize(2 * 1024L * 1024L * 1024L);
+
+        MemoryConfiguration memCfg = new MemoryConfiguration()
+                .setMemoryPolicies(memPlcCfg)
+                .setDefaultMemoryPolicyName(memPlcCfg.getName());
+
+        cfg.setMemoryConfiguration(memCfg);
+
+        PersistentStoreConfiguration storeCfg = new PersistentStoreConfiguration()
+                .setFileIOFactory(new FailingFileIOFactory())
+                .setWalMode(WALMode.BACKGROUND)
+                // Setting WAL Segment size to high values forces flushing by timeout.
+                .setWalSegmentSize(flushByTimeout ? 500_000 : 50_000);
+
+        cfg.setPersistentStoreConfiguration(storeCfg);
+
+        return cfg;
+    }
+
+    /**
+     * Test flushing error recovery when flush is triggered asynchronously by timeout
+     *
+     * @throws Exception In case of fail
+     */
+    public void testErrorOnFlushByTimeout() throws Exception {
+        flushByTimeout = true;
+        flushingErrorTest();
+    }
+
+    /**
+     * Test flushing error recovery when flush is triggered directly by transaction commit
+     *
+     * @throws Exception In case of fail
+     */
+    public void testErrorOnDirectFlush() throws Exception {
+        flushByTimeout = false;
+        flushingErrorTest();
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    private void flushingErrorTest() throws Exception {
+        final IgniteEx grid = startGrid(0);
+        grid.active(true);
+
+        IgniteCache<Object, Object> cache = grid.cache(TEST_CACHE);
+
+        final int iterations = 100;
+
+        try {
+            for (int i = 0; i < iterations; i++) {
+                Transaction tx = grid.transactions().txStart(
+                        TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED);
+
+                cache.put(i, "testValue" + i);
+
+                Thread.sleep(100L);
+
+                tx.commitAsync().get();
+            }
+        }
+        catch (Exception expected) {
+            // There can be any exception. Do nothing.
+        }
+
+        // We should await successful stop of node.
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override
+            public boolean apply() {
+                return grid.context().gateway().getState() == GridKernalState.STOPPED;
+            }
+        }, getTestTimeout());
+    }
+
+    /**
+     * @throws IgniteCheckedException
+     */
+    private void deleteWorkFiles() throws IgniteCheckedException {
+        deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false));
+    }
+
+    /**
+     * Create File I/O which fails after second attempt to write to File
+     */
+    private static class FailingFileIOFactory implements FileIOFactory {
+        private static final long serialVersionUID = 0L;
+
+        private final FileIOFactory delegateFactory = new RandomAccessFileIOFactory();
+
+        @Override
+        public FileIO create(File file) throws IOException {
+            return create(file, "rw");
+        }
+
+        @Override
+        public FileIO create(File file, String mode) throws IOException {
+            FileIO delegate = delegateFactory.create(file, mode);
+
+            return new FileIODecorator(delegate) {
+                int writeAttempts = 2;
+
+                @Override
+                public int write(ByteBuffer sourceBuffer) throws IOException {
+                    if (--writeAttempts == 0)
+                        throw new RuntimeException("Test exception. Unable to write to file.");
+
+                    return super.write(sourceBuffer);
+                }
+            };
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java
index 303f14e..b93c74d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java
@@ -23,7 +23,10 @@ import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIO;
+import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExpander;
 import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput;
 import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
 import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32;
@@ -47,9 +50,10 @@ public class IgniteDataIntegrityTests extends TestCase {
 
         randomAccessFile = new RandomAccessFile(file, "rw");
 
-        fileInput = new FileInput(randomAccessFile.getChannel(), ByteBuffer.allocate(1024));
-
-        PureJavaCrc32 pureJavaCrc32 = new PureJavaCrc32();
+        fileInput = new FileInput(
+            new RandomAccessFileIO(randomAccessFile),
+            new ByteBufferExpander(1024, ByteOrder.BIG_ENDIAN)
+        );
 
         ByteBuffer buf = ByteBuffer.allocate(1024);
         ThreadLocalRandom curr = ThreadLocalRandom.current();


Mime
View raw message