ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yzhda...@apache.org
Subject [21/50] [abbrv] ignite git commit: IGNITE-6228 Avoid closing page store by thread interruption. Fixes #2715
Date Fri, 13 Oct 2017 17:43:06 GMT
IGNITE-6228 Avoid closing page store by thread interruption. Fixes #2715


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/111d8abb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/111d8abb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/111d8abb

Branch: refs/heads/ignite-2.1.5-p1
Commit: 111d8abbe6ec7710c7f0e7ebe6d43f3ccb904dcb
Parents: be8afd4
Author: Alexei Scherbakov <alexey.scherbakoff@gmail.com>
Authored: Thu Sep 21 17:40:16 2017 +0300
Committer: Alexey Goncharuk <alexey.goncharuk@gmail.com>
Committed: Thu Sep 21 18:10:22 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |   5 +
 .../PersistentStoreConfiguration.java           |   6 +-
 .../GridCacheDatabaseSharedManager.java         |   7 +-
 .../cache/persistence/file/AsyncFileIO.java     | 218 +++++++++++++++++++
 .../persistence/file/AsyncFileIOFactory.java    |  52 +++++
 .../cache/persistence/file/FileIOFactory.java   |  25 ++-
 .../cache/persistence/file/FilePageStore.java   |   5 +-
 .../file/FileVersionCheckingFactory.java        |   2 +-
 .../persistence/file/RandomAccessFileIO.java    |  48 ++--
 .../file/RandomAccessFileIOFactory.java         |  14 +-
 .../wal/AbstractWalRecordsIterator.java         |   2 +-
 .../cache/persistence/wal/FileInput.java        |   7 +
 .../wal/FileWriteAheadLogManager.java           |   8 +-
 .../reader/StandaloneWalRecordsIterator.java    |   4 +-
 .../internal/util/future/GridFutureAdapter.java |  16 ++
 .../resources/META-INF/classnames.properties    |   2 +
 .../file/IgnitePdsThreadInterruptionTest.java   | 205 +++++++++++++++++
 .../db/wal/IgniteWalFlushFailoverTest.java      |  22 +-
 .../db/wal/crc/IgniteDataIntegrityTests.java    |  39 ++--
 .../development/utils/IgniteWalConverter.java   |   1 -
 .../IgnitePdsWithIndexingCoreTestSuite.java     |   2 +
 21 files changed, 606 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 30d5339..628b165 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -706,6 +706,11 @@ public final class IgniteSystemProperties {
     public static final String IGNITE_ENABLE_FORCIBLE_NODE_KILL = "IGNITE_ENABLE_FORCIBLE_NODE_KILL";
 
     /**
+     * If this property is set, then Ignite will use Async File IO factory by default.
+     */
+    public static final String IGNITE_USE_ASYNC_FILE_IO_FACTORY = "IGNITE_USE_ASYNC_FILE_IO_FACTORY";
+
+    /**
      * If the property is set {@link org.apache.ignite.internal.pagemem.wal.record.TxRecord} records
      * will be logged to WAL.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
index 888bf42..abca5a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
@@ -17,6 +17,8 @@
 package org.apache.ignite.configuration;
 
 import java.io.Serializable;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.internal.processors.cache.persistence.file.AsyncFileIOFactory;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
 import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -137,7 +139,9 @@ public class PersistentStoreConfiguration implements Serializable {
     private boolean alwaysWriteFullPages = DFLT_WAL_ALWAYS_WRITE_FULL_PAGES;
 
     /** Factory to provide I/O interface for files */
-    private FileIOFactory fileIOFactory = new RandomAccessFileIOFactory();
+    private FileIOFactory fileIOFactory =
+        IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_USE_ASYNC_FILE_IO_FACTORY, false) ?
+        new AsyncFileIOFactory() : new RandomAccessFileIOFactory();
 
     /**
      * Number of sub-intervals the whole {@link #setRateTimeInterval(long)} will be split into to calculate

http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index eef667e..277143c 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -144,6 +144,7 @@ import org.apache.ignite.thread.IgniteThreadPoolExecutor;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
+import static java.nio.file.StandardOpenOption.READ;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
 
@@ -742,7 +743,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
      * @param partFile Partition file.
      */
     private int resolvePageSizeFromPartitionFile(Path partFile) throws IOException, IgniteCheckedException {
-        try (FileIO fileIO = persistenceCfg.getFileIOFactory().create(partFile.toFile(), "r")) {
+        try (FileIO fileIO = persistenceCfg.getFileIOFactory().create(partFile.toFile())) {
             int minimalHdr = FilePageStore.HEADER_SIZE;
 
             if (fileIO.size() < minimalHdr)
@@ -961,7 +962,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                 checkpointLock.readLock().unlock();
 
                 try {
-                    checkpointer.wakeupForCheckpoint(0, "too many dirty pages").cpBeginFut.get();
+                    checkpointer.wakeupForCheckpoint(0, "too many dirty pages").cpBeginFut.getUninterruptibly();
                 }
                 catch (IgniteCheckedException e) {
                     throw new IgniteException("Failed to wait for checkpoint begin.", e);
@@ -1390,7 +1391,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     private WALPointer readPointer(File cpMarkerFile, ByteBuffer buf) throws IgniteCheckedException {
         buf.position(0);
 
-        try (FileChannel ch = FileChannel.open(cpMarkerFile.toPath(), StandardOpenOption.READ)) {
+        try (FileChannel ch = FileChannel.open(cpMarkerFile.toPath(), READ)) {
             ch.read(buf);
 
             buf.flip();

http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java
new file mode 100644
index 0000000..8fad7a5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousFileChannel;
+import java.nio.channels.CompletionHandler;
+import java.nio.file.OpenOption;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+
+/**
+ * File I/O implementation based on {@link AsynchronousFileChannel}.
+ */
+public class AsyncFileIO implements FileIO {
+    /**
+     * File channel associated with {@code file}
+     */
+    private final AsynchronousFileChannel ch;
+
+    /**
+     * Channel's position.
+     */
+    private volatile long position;
+
+    /** */
+    private final ThreadLocal<ChannelOpFuture> holder;
+
+    /** */
+    private GridConcurrentHashSet<ChannelOpFuture> asyncFuts = new GridConcurrentHashSet<>();
+
+    /**
+     * Creates I/O implementation for specified {@code file}
+     * @param file Random access file
+     * @param modes Open modes.
+     */
+    public AsyncFileIO(File file, ThreadLocal<ChannelOpFuture> holder, OpenOption... modes) throws IOException {
+        this.ch = AsynchronousFileChannel.open(file.toPath(), modes);
+
+        this.holder = holder;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long position() throws IOException {
+        return position;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void position(long newPosition) throws IOException {
+        this.position = newPosition;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int read(ByteBuffer destinationBuffer) throws IOException {
+        ChannelOpFuture fut = holder.get();
+        fut.reset();
+
+        ch.read(destinationBuffer, position, this, fut);
+
+        try {
+            return fut.getUninterruptibly();
+        }
+        catch (IgniteCheckedException e) {
+            throw new IOException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public int read(ByteBuffer destinationBuffer, long position) throws IOException {
+        ChannelOpFuture fut = holder.get();
+        fut.reset();
+
+        ch.read(destinationBuffer, position, null, fut);
+
+        try {
+            return fut.getUninterruptibly();
+        }
+        catch (IgniteCheckedException e) {
+            throw new IOException(e);
+        }
+        finally {
+            asyncFuts.remove(fut);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public int read(byte[] buffer, int offset, int length) throws IOException {
+        ChannelOpFuture fut = holder.get();
+        fut.reset();
+
+        ch.read(ByteBuffer.wrap(buffer, offset, length), position, this, fut);
+
+        try {
+            return fut.getUninterruptibly();
+        }
+        catch (IgniteCheckedException e) {
+            throw new IOException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public int write(ByteBuffer sourceBuffer) throws IOException {
+        ChannelOpFuture fut = holder.get();
+        fut.reset();
+
+        ch.write(sourceBuffer, position, this, fut);
+
+        try {
+            return fut.getUninterruptibly();
+        }
+        catch (IgniteCheckedException e) {
+            throw new IOException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public int write(ByteBuffer sourceBuffer, long position) throws IOException {
+        ChannelOpFuture fut = holder.get();
+        fut.reset();
+
+        asyncFuts.add(fut);
+
+        ch.write(sourceBuffer, position, null, fut);
+
+        try {
+            return fut.getUninterruptibly();
+        }
+        catch (IgniteCheckedException e) {
+            throw new IOException(e);
+        }
+        finally {
+            asyncFuts.remove(fut);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(byte[] buffer, int offset, int length) throws IOException {
+        ChannelOpFuture fut = holder.get();
+        fut.reset();
+
+        ch.write(ByteBuffer.wrap(buffer, offset, length), position, this, fut);
+
+        try {
+            fut.getUninterruptibly();
+        }
+        catch (IgniteCheckedException e) {
+            throw new IOException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void force() throws IOException {
+        ch.force(false);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long size() throws IOException {
+        return ch.size();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void clear() throws IOException {
+        ch.truncate(0);
+
+        this.position = 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws IOException {
+        for (ChannelOpFuture asyncFut : asyncFuts) {
+            try {
+                asyncFut.getUninterruptibly(); // Ignore interrupts while waiting for channel close.
+            }
+            catch (IgniteCheckedException e) {
+                throw new IOException(e);
+            }
+        }
+
+        ch.close();
+    }
+
+    /** */
+    static class ChannelOpFuture extends GridFutureAdapter<Integer> implements CompletionHandler<Integer, AsyncFileIO>  {
+        /** {@inheritDoc} */
+        @Override public void completed(Integer result, AsyncFileIO attachment) {
+            if (attachment != null) {
+                if (result != -1)
+                    attachment.position += result;
+            }
+
+            // Release waiter and allow next operation to begin.
+            super.onDone(result, null);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void failed(Throwable exc, AsyncFileIO attachment) {
+            super.onDone(exc);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIOFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIOFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIOFactory.java
new file mode 100644
index 0000000..0fb3052
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIOFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.channels.AsynchronousFileChannel;
+import java.nio.file.OpenOption;
+
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.READ;
+import static java.nio.file.StandardOpenOption.WRITE;
+
+/**
+ * File I/O factory which uses {@link AsynchronousFileChannel} based implementation of FileIO.
+ */
+public class AsyncFileIOFactory implements FileIOFactory {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** {@inheritDoc} */
+    @Override public FileIO create(File file) throws IOException {
+        return create(file, CREATE, READ, WRITE);
+    }
+
+    /** */
+    private ThreadLocal<AsyncFileIO.ChannelOpFuture> holder = new ThreadLocal<AsyncFileIO.ChannelOpFuture>() {
+        @Override protected AsyncFileIO.ChannelOpFuture initialValue() {
+            return new AsyncFileIO.ChannelOpFuture();
+        }
+    };
+
+    /** {@inheritDoc} */
+    @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+        return new AsyncFileIO(file, holder, modes);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java
index 0ffc653..c3a75f5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java
@@ -20,26 +20,29 @@ package org.apache.ignite.internal.processors.cache.persistence.file;
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
+import java.nio.file.OpenOption;
 
+/**
+ * {@link FileIO} factory definition.
+ */
 public interface FileIOFactory extends Serializable {
-
     /**
-     * Creates I/O interface for file with default I/O mode
+     * Creates I/O interface for file with default I/O mode.
      *
-     * @param file File
-     * @return File I/O interface
-     * @throws IOException If I/O interface creation was failed
+     * @param file File.
+     * @return File I/O interface.
+     * @throws IOException If I/O interface creation was failed.
      */
-    FileIO create(File file) throws IOException;
+    public FileIO create(File file) throws IOException;
 
     /**
-     * Creates I/O interface for file with specified mode
+     * Creates I/O interface for file with specified mode.
      *
      * @param file File
-     * @param mode I/O mode in
-     * @return File I/O interface
-     * @throws IOException If I/O interface creation was failed
+     * @param modes Open modes.
+     * @return File I/O interface.
+     * @throws IOException If I/O interface creation was failed.
      */
-    FileIO create(File file, String mode) throws IOException;
+    public FileIO create(File file, OpenOption... modes) throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
index 6052a7c..98764a2 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
@@ -35,6 +35,9 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDat
 import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.READ;
+import static java.nio.file.StandardOpenOption.WRITE;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC;
 
 /**
@@ -400,7 +403,7 @@ public class FilePageStore implements PageStore {
                     IgniteCheckedException err = null;
 
                     try {
-                        this.fileIO = fileIO = ioFactory.create(cfgFile, "rw");
+                        this.fileIO = fileIO = ioFactory.create(cfgFile, CREATE, READ, WRITE);
 
                         if (cfgFile.length() == 0)
                             allocated.set(initFile());

http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java
index 53bd802..40870dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java
@@ -55,7 +55,7 @@ public class FileVersionCheckingFactory implements FilePageStoreFactory {
         if (!file.exists())
             return createPageStore(type, file, latestVersion());
 
-        try (FileIO fileIO = fileIOFactory.create(file, "r")) {
+        try (FileIO fileIO = fileIOFactory.create(file)) {
             int minHdr = FilePageStore.HEADER_SIZE;
 
             if (fileIO.size() < minHdr)

http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
index 73a560a..55849fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
@@ -17,94 +17,88 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.file;
 
+import java.io.File;
 import java.io.IOException;
-import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.nio.file.OpenOption;
 
 /**
- * File I/O implementation based on {@code java.io.RandomAccessFile}.
+ * File I/O implementation based on {@link FileChannel}.
  */
 public class RandomAccessFileIO implements FileIO {
-
-    /**
-     * Random access file associated with this I/O
-     */
-    private final RandomAccessFile file;
-
     /**
-     * File channel associated with {@code file}
+     * File channel.
      */
-    private final FileChannel channel;
+    private final FileChannel ch;
 
     /**
      * Creates I/O implementation for specified {@code file}
      *
-     * @param file Random access file
+     * @param file File.
+     * @param modes Open modes.
      */
-    public RandomAccessFileIO(RandomAccessFile file) {
-        this.file = file;
-        this.channel = file.getChannel();
+    public RandomAccessFileIO(File file, OpenOption... modes) throws IOException {
+        ch = FileChannel.open(file.toPath(), modes);
     }
 
     /** {@inheritDoc} */
     @Override public long position() throws IOException {
-        return channel.position();
+        return ch.position();
     }
 
     /** {@inheritDoc} */
     @Override public void position(long newPosition) throws IOException {
-        channel.position(newPosition);
+        ch.position(newPosition);
     }
 
     /** {@inheritDoc} */
     @Override public int read(ByteBuffer destinationBuffer) throws IOException {
-        return channel.read(destinationBuffer);
+        return ch.read(destinationBuffer);
     }
 
     /** {@inheritDoc} */
     @Override public int read(ByteBuffer destinationBuffer, long position) throws IOException {
-        return channel.read(destinationBuffer, position);
+        return ch.read(destinationBuffer, position);
     }
 
     /** {@inheritDoc} */
     @Override public int read(byte[] buffer, int offset, int length) throws IOException {
-        return file.read(buffer, offset, length);
+        return ch.read(ByteBuffer.wrap(buffer, offset, length));
     }
 
     /** {@inheritDoc} */
     @Override public int write(ByteBuffer sourceBuffer) throws IOException {
-        return channel.write(sourceBuffer);
+        return ch.write(sourceBuffer);
     }
 
     /** {@inheritDoc} */
     @Override public int write(ByteBuffer sourceBuffer, long position) throws IOException {
-        return channel.write(sourceBuffer, position);
+        return ch.write(sourceBuffer, position);
     }
 
     /** {@inheritDoc} */
     @Override public void write(byte[] buffer, int offset, int length) throws IOException {
-        file.write(buffer, offset, length);
+        ch.write(ByteBuffer.wrap(buffer, offset, length));
     }
 
     /** {@inheritDoc} */
     @Override public void force() throws IOException {
-        channel.force(false);
+        ch.force(false);
     }
 
     /** {@inheritDoc} */
     @Override public long size() throws IOException {
-        return channel.size();
+        return ch.size();
     }
 
     /** {@inheritDoc} */
     @Override public void clear() throws IOException {
-        channel.position(0);
-        file.setLength(0);
+        ch.truncate(0);
     }
 
     /** {@inheritDoc} */
     @Override public void close() throws IOException {
-        file.close();
+        ch.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIOFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIOFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIOFactory.java
index 6b731f2..856ba1c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIOFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIOFactory.java
@@ -19,7 +19,11 @@ package org.apache.ignite.internal.processors.cache.persistence.file;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.RandomAccessFile;
+import java.nio.file.OpenOption;
+
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.READ;
+import static java.nio.file.StandardOpenOption.WRITE;
 
 /**
  * File I/O factory which provides RandomAccessFileIO implementation of FileIO.
@@ -30,13 +34,11 @@ public class RandomAccessFileIOFactory implements FileIOFactory {
 
     /** {@inheritDoc} */
     @Override public FileIO create(File file) throws IOException {
-        return create(file, "rw");
+        return create(file, CREATE, READ, WRITE);
     }
 
     /** {@inheritDoc} */
-    @Override public FileIO create(File file, String mode) throws IOException {
-        RandomAccessFile rf = new RandomAccessFile(file, mode);
-
-        return new RandomAccessFileIO(rf);
+    @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+        return new RandomAccessFileIO(file, modes);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
index 2749d5c..d5a2555 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
@@ -258,7 +258,7 @@ public abstract class AbstractWalRecordsIterator
         @Nullable final FileWALPointer start)
         throws IgniteCheckedException, FileNotFoundException {
         try {
-            FileIO fileIO = ioFactory.create(desc.file, "r");
+            FileIO fileIO = ioFactory.create(desc.file);
 
             try {
                 FileInput in = new FileInput(fileIO, buf);

http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
index 74edbfa..3b20fce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
@@ -63,6 +63,13 @@ public final class FileInput implements ByteBufferBackedDataInput {
     }
 
     /**
+     * File I/O.
+     */
+    public FileIO io() {
+        return io;
+    }
+
+    /**
      * Clear buffer.
      */
     private void clearBuffer() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 18584a8..87069d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -79,6 +79,10 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.READ;
+import static java.nio.file.StandardOpenOption.WRITE;
+
 /**
  * File WAL manager.
  */
@@ -940,7 +944,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         if (log.isDebugEnabled())
             log.debug("Formatting file [exists=" + file.exists() + ", file=" + file.getAbsolutePath() + ']');
 
-        try (FileIO fileIO = ioFactory.create(file, "rw")) {
+        try (FileIO fileIO = ioFactory.create(file, CREATE, READ, WRITE)) {
             int left = psCfg.getWalSegmentSize();
 
             if (mode == WALMode.DEFAULT) {
@@ -1365,7 +1369,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                 Files.move(dstTmpFile.toPath(), dstFile.toPath());
 
                 if (mode == WALMode.DEFAULT) {
-                    try (FileIO f0 = ioFactory.create(dstFile, "rw")) {
+                    try (FileIO f0 = ioFactory.create(dstFile, CREATE, READ, WRITE)) {
                         f0.force();
                     }
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
index 900aab5..c92d572 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
@@ -181,7 +181,7 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
 
     /**
      * This methods checks all provided files to be correct WAL segment.
-     * Header record and its position is checked. WAL position is used to deremine real index.
+     * Header record and its position is checked. WAL position is used to determine real index.
      * File index from file name is ignored.
      *
      * @param allFiles files to scan
@@ -202,7 +202,7 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
             FileWALPointer ptr;
 
             try (
-                FileIO fileIO = ioFactory.create(file, "r");
+                FileIO fileIO = ioFactory.create(file);
                 ByteBufferExpander buf = new ByteBufferExpander(HEADER_RECORD_SIZE, ByteOrder.nativeOrder())
             ) {
                 final DataInput in = new FileInput(fileIO, buf);

http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
index 323babd..f8c0b14 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.concurrent.locks.LockSupport;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
@@ -499,6 +500,21 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
     }
 
     /**
+     * Resets future for subsequent reuse.
+     */
+    public void reset() {
+        final Object oldState = state;
+
+        if (oldState == INIT)
+            return;
+
+        if (!isDone(oldState))
+            throw new IgniteException("Illegal state");
+
+        compareAndSetState(oldState, INIT);
+    }
+
+    /**
      * Callback to notify that future is cancelled.
      *
      * @return {@code True} if cancel flag was set by this call.

http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index 2fb8f4b..ad3846f 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -2090,3 +2090,5 @@ org.apache.ignite.transactions.TransactionRollbackException
 org.apache.ignite.transactions.TransactionState
 org.apache.ignite.transactions.TransactionTimeoutException
 org.apache.ignite.util.AttributeNodeFilter
+org.apache.ignite.internal.processors.cache.persistence.file.AsyncFileIO
+org.apache.ignite.internal.processors.cache.persistence.file.AsyncFileIOFactory

http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java
new file mode 100644
index 0000000..aab569a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db.file;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.configuration.MemoryPolicyConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.processors.cache.persistence.file.AsyncFileIOFactory;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jsr166.ThreadLocalRandom8;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Test what interruptions of writing threads do not affect PDS.
+ */
+public class IgnitePdsThreadInterruptionTest extends GridCommonAbstractTest {
+    /** */
+    private static final int PAGE_SIZE = 1 << 12; // 4096
+
+    /** */
+    public static final int THREADS_CNT = 1;
+
+    /**
+     * Cache name.
+     */
+    private final String cacheName = "cache";
+
+    /** */
+    private volatile boolean stop = false;
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        final IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setPersistentStoreConfiguration(storeConfiguration());
+
+        cfg.setMemoryConfiguration(memoryConfiguration());
+
+        cfg.setCacheConfiguration(new CacheConfiguration<>(cacheName));
+
+        return cfg;
+    }
+
+    /**
+     * @return Store config.
+     */
+    private PersistentStoreConfiguration storeConfiguration() {
+        PersistentStoreConfiguration cfg = new PersistentStoreConfiguration();
+
+        cfg.setWalMode(WALMode.LOG_ONLY);
+
+        cfg.setWalFsyncDelayNanos(0);
+
+        cfg.setFileIOFactory(new AsyncFileIOFactory());
+
+        return cfg;
+    }
+
+    /**
+     * @return Memory config.
+     */
+    private MemoryConfiguration memoryConfiguration() {
+        final MemoryConfiguration memCfg = new MemoryConfiguration();
+
+        MemoryPolicyConfiguration memPlcCfg = new MemoryPolicyConfiguration();
+        // memPlcCfg.setPageEvictionMode(RANDOM_LRU); TODO Fix NPE on start.
+        memPlcCfg.setName("dfltMemPlc");
+
+        memCfg.setPageSize(PAGE_SIZE);
+        memCfg.setConcurrencyLevel(1);
+        memCfg.setMemoryPolicies(memPlcCfg);
+        memCfg.setDefaultMemoryPolicyName("dfltMemPlc");
+
+        return memCfg;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        deleteWorkFiles();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+
+        deleteWorkFiles();
+    }
+
+    /**
+     * Tests interruptions on WAL write.
+     *
+     * @throws Exception
+     */
+    public void testInterruptsOnWALWrite() throws Exception {
+        final Ignite ignite = startGrid();
+
+        ignite.active(true);
+
+        final int valLen = 8192;
+
+        final byte[] payload = new byte[valLen];
+
+        final int maxKey = 100_000;
+
+        Thread[] workers = new Thread[THREADS_CNT];
+
+        final AtomicReference<Throwable> fail = new AtomicReference<>();
+
+        Runnable clo = new Runnable() {
+            @Override public void run() {
+                IgniteCache<Object, Object> cache = ignite.cache(cacheName);
+
+                while (!stop)
+                    cache.put(ThreadLocalRandom8.current().nextInt(maxKey), payload);
+            }
+        };
+
+        for (int i = 0; i < workers.length; i++) {
+            workers[i] = new Thread(clo);
+            workers[i].setName("writer-" + i);
+            workers[i].setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+                @Override public void uncaughtException(Thread t, Throwable e) {
+                    fail.compareAndSet(null, e);
+                }
+            });
+        }
+
+        for (Thread worker : workers)
+            worker.start();
+
+        Thread.sleep(3_000);
+
+        // Interrupts should not affect writes.
+        for (Thread worker : workers)
+            worker.interrupt();
+
+        Thread.sleep(3_000);
+
+        stop = true;
+
+        for (Thread worker : workers)
+            worker.join();
+
+        Throwable t = fail.get();
+
+        assert t == null : t;
+
+        IgniteCache<Object, Object> cache = ignite.cache(cacheName);
+
+        int verifiedKeys = 0;
+
+        // Post check.
+        for (int i = 0; i < maxKey; i++) {
+            byte[] val = (byte[]) cache.get(i);
+
+            if (val != null) {
+                assertEquals("Illegal length", valLen, val.length);
+
+                verifiedKeys++;
+            }
+        }
+
+        log.info("Verified keys: " + verifiedKeys);
+    }
+
+    /**
+     * @throws IgniteCheckedException If fail.
+     */
+    private void deleteWorkFiles() throws IgniteCheckedException {
+        deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
index cad10ae..048e8bf 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
@@ -42,12 +42,16 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
+import java.nio.file.OpenOption;
+
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.READ;
+import static java.nio.file.StandardOpenOption.WRITE;
 
 /**
  *
  */
 public class IgniteWalFlushFailoverTest extends GridCommonAbstractTest {
-
     /** */
     private static final String TEST_CACHE = "testCache";
 
@@ -168,22 +172,22 @@ public class IgniteWalFlushFailoverTest extends GridCommonAbstractTest {
     private static class FailingFileIOFactory implements FileIOFactory {
         private static final long serialVersionUID = 0L;
 
+        /** */
         private final FileIOFactory delegateFactory = new RandomAccessFileIOFactory();
 
-        @Override
-        public FileIO create(File file) throws IOException {
-            return create(file, "rw");
+        /** {@inheritDoc} */
+        @Override public FileIO create(File file) throws IOException {
+            return create(file, CREATE, READ, WRITE);
         }
 
-        @Override
-        public FileIO create(File file, String mode) throws IOException {
-            FileIO delegate = delegateFactory.create(file, mode);
+        /** {@inheritDoc} */
+        @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+            FileIO delegate = delegateFactory.create(file, modes);
 
             return new FileIODecorator(delegate) {
                 int writeAttempts = 2;
 
-                @Override
-                public int write(ByteBuffer sourceBuffer) throws IOException {
+                @Override public int write(ByteBuffer sourceBuffer) throws IOException {
                     if (--writeAttempts == 0)
                         throw new RuntimeException("Test exception. Unable to write to file.");
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java
index e4874d9..3d52507 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java
@@ -21,11 +21,11 @@ import junit.framework.TestCase;
 import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
-import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.util.concurrent.ThreadLocalRandom;
-import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
 import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExpander;
 import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput;
 import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
@@ -38,9 +38,6 @@ public class IgniteDataIntegrityTests extends TestCase {
     /** File input. */
     private FileInput fileInput;
 
-    /** Random access file. */
-    private RandomAccessFile randomAccessFile;
-
     /** Buffer expander. */
     private ByteBufferExpander expBuf;
 
@@ -51,13 +48,13 @@ public class IgniteDataIntegrityTests extends TestCase {
         File file = File.createTempFile("integrity", "dat");
         file.deleteOnExit();
 
-        randomAccessFile = new RandomAccessFile(file, "rw");
-
         expBuf = new ByteBufferExpander(1024, ByteOrder.BIG_ENDIAN);
 
+        FileIOFactory factory = new RandomAccessFileIOFactory();
+
         fileInput = new FileInput(
-            new RandomAccessFileIO(randomAccessFile),
-            expBuf
+                factory.create(file),
+                expBuf
         );
 
         ByteBuffer buf = ByteBuffer.allocate(1024);
@@ -71,13 +68,15 @@ public class IgniteDataIntegrityTests extends TestCase {
             buf.putInt(PureJavaCrc32.calcCrc32(buf, 12));
         }
 
-        randomAccessFile.write(buf.array());
-        randomAccessFile.getFD().sync();
+        buf.rewind();
+
+        fileInput.io().write(buf);
+        fileInput.io().force();
     }
 
     /** {@inheritDoc} */
     @Override protected void tearDown() throws Exception {
-        randomAccessFile.close();
+        fileInput.io().close();
         expBuf.close();
     }
 
@@ -177,22 +176,24 @@ public class IgniteDataIntegrityTests extends TestCase {
      */
     private void toggleOneRandomBit(int rangeFrom, int rangeTo) throws IOException {
         int pos = ThreadLocalRandom.current().nextInt(rangeFrom, rangeTo);
-        randomAccessFile.seek(pos);
+        fileInput.io().position(pos);
+
+        byte[] buf = new byte[1];
 
-        byte b = randomAccessFile.readByte();
+        fileInput.io().read(buf, 0, 1);
 
-        b ^=  (1 << 3);
+        buf[0] ^= (1 << 3);
 
-        randomAccessFile.seek(pos);
-        randomAccessFile.writeByte(b);
-        randomAccessFile.getFD().sync();
+        fileInput.io().position(pos);
+        fileInput.io().write(buf, 0, 1);
+        fileInput.io().force();
     }
 
     /**
      *
      */
     private void checkIntegrity() throws Exception {
-        randomAccessFile.seek(0);
+        fileInput.io().position(0);
 
         for (int i = 0; i < 1024 / 16; i++) {
             try(FileInput.Crc32CheckingFileInput in = fileInput.startRead(false)) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/IgniteWalConverter.java
----------------------------------------------------------------------
diff --git a/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/IgniteWalConverter.java b/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/IgniteWalConverter.java
index b7c3cb7..f3268d9 100644
--- a/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/IgniteWalConverter.java
+++ b/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/IgniteWalConverter.java
@@ -21,7 +21,6 @@ import java.io.File;
 import org.apache.ignite.internal.pagemem.wal.WALIterator;
 import org.apache.ignite.internal.pagemem.wal.WALPointer;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
-import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
 import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;

http://git-wip-us.apache.org/repos/asf/ignite/blob/111d8abb/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
index ae8ea18..cfbe2e0 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsMulti
 import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsPageEvictionTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsCacheIntegrationTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsNoActualWalHistoryTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsThreadInterruptionTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRecoveryTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalRecoveryTxLogicalRecordsTest;
 
@@ -53,6 +54,7 @@ public class IgnitePdsWithIndexingCoreTestSuite extends TestSuite {
 
         suite.addTestSuite(IgnitePdsBinaryMetadataOnClusterRestartTest.class);
         suite.addTestSuite(IgnitePdsMarshallerMappingRestoreOnNodeStartTest.class);
+        suite.addTestSuite(IgnitePdsThreadInterruptionTest.class);
 
         return suite;
     }


Mime
View raw message