ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ira...@apache.org
Subject ignite git commit: IGNITE-8749 Exception for "no space left" situation should be propagated to FailureHandler - Fixes #4200.
Date Wed, 20 Jun 2018 14:02:36 GMT
Repository: ignite
Updated Branches:
  refs/heads/master b80ccda4f -> 3fff8a85e


IGNITE-8749 Exception for "no space left" situation should be propagated to FailureHandler - Fixes #4200.

Signed-off-by: Ivan Rakov <irakov@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3fff8a85
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3fff8a85
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3fff8a85

Branch: refs/heads/master
Commit: 3fff8a85e0b3297883b6bdae3b46e4d34352c630
Parents: b80ccda
Author: Dmitriy Sorokin <d.w.sorokin@gmail.com>
Authored: Wed Jun 20 16:41:46 2018 +0300
Committer: Ivan Rakov <irakov@apache.org>
Committed: Wed Jun 20 16:41:46 2018 +0300

----------------------------------------------------------------------
 .../wal/FileWriteAheadLogManager.java           | 190 +++++++-------
 .../wal/FsyncModeFileWriteAheadLogManager.java  | 235 +++++++++--------
 .../ignite/failure/TestFailureHandler.java      |  19 ++
 .../db/wal/IgniteWalFormatFileFailoverTest.java | 258 +++++++++++++++++++
 .../ignite/testsuites/IgnitePdsTestSuite2.java  |   3 +
 5 files changed, 513 insertions(+), 192 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3fff8a85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 5703d17..3ca51f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -604,48 +604,43 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
     /** {@inheritDoc} */
     @Override public void resumeLogging(WALPointer lastPtr) throws IgniteCheckedException {
-        try {
-            assert currHnd == null;
-            assert lastPtr == null || lastPtr instanceof FileWALPointer;
+        assert currHnd == null;
+        assert lastPtr == null || lastPtr instanceof FileWALPointer;
 
-            FileWALPointer filePtr = (FileWALPointer)lastPtr;
+        FileWALPointer filePtr = (FileWALPointer)lastPtr;
 
-            walWriter = new WALWriter(log);
+        walWriter = new WALWriter(log);
 
-            if (!mmap)
-                new IgniteThread(walWriter).start();
+        if (!mmap)
+            new IgniteThread(walWriter).start();
 
-            currHnd = restoreWriteHandle(filePtr);
+        currHnd = restoreWriteHandle(filePtr);
 
-            // For new handle write serializer version to it.
-            if (filePtr == null)
-                currHnd.writeHeader();
+        // For new handle write serializer version to it.
+        if (filePtr == null)
+            currHnd.writeHeader();
 
-            if (currHnd.serializer.version() != serializer.version()) {
-                if (log.isInfoEnabled())
-                    log.info("Record serializer version change detected, will start logging with a new WAL record " +
-                        "serializer to a new WAL segment [curFile=" + currHnd + ", newVer=" + serializer.version() +
-                        ", oldVer=" + currHnd.serializer.version() + ']');
+        if (currHnd.serializer.version() != serializer.version()) {
+            if (log.isInfoEnabled())
+                log.info("Record serializer version change detected, will start logging with a new WAL record " +
+                    "serializer to a new WAL segment [curFile=" + currHnd + ", newVer=" + serializer.version() +
+                    ", oldVer=" + currHnd.serializer.version() + ']');
 
-                rollOver(currHnd);
-            }
+            rollOver(currHnd);
+        }
 
-            currHnd.resume = false;
+        currHnd.resume = false;
 
-            if (mode == WALMode.BACKGROUND) {
-                backgroundFlushSchedule = cctx.time().schedule(new Runnable() {
-                    @Override public void run() {
-                        doFlush();
-                    }
-                }, flushFreq, flushFreq);
-            }
-
-            if (walAutoArchiveAfterInactivity > 0)
-                scheduleNextInactivityPeriodElapsedCheck();
-        }
-        catch (StorageException e) {
-            throw new IgniteCheckedException(e);
+        if (mode == WALMode.BACKGROUND) {
+            backgroundFlushSchedule = cctx.time().schedule(new Runnable() {
+                @Override public void run() {
+                    doFlush();
+                }
+            }, flushFreq, flushFreq);
         }
+
+        if (walAutoArchiveAfterInactivity > 0)
+            scheduleNextInactivityPeriodElapsedCheck();
     }
 
     /**
@@ -1130,9 +1125,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     /**
      * @param lastReadPtr Last read WAL file pointer.
      * @return Initialized file write handle.
-     * @throws IgniteCheckedException If failed to initialize WAL write handle.
+     * @throws StorageException If failed to initialize WAL write handle.
      */
-    private FileWriteHandle restoreWriteHandle(FileWALPointer lastReadPtr) throws IgniteCheckedException {
+    private FileWriteHandle restoreWriteHandle(FileWALPointer lastReadPtr) throws StorageException {
         long absIdx = lastReadPtr == null ? 0 : lastReadPtr.index();
 
         @Nullable FileArchiver archiver0 = archiver;
@@ -1174,14 +1169,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                 SegmentedRingByteBuffer rbuf;
 
                 if (mmap) {
-                    try {
-                        MappedByteBuffer buf = fileIO.map((int)maxWalSegmentSize);
+                    MappedByteBuffer buf = fileIO.map((int)maxWalSegmentSize);
 
-                        rbuf = new SegmentedRingByteBuffer(buf, metrics);
-                    }
-                    catch (IOException e) {
-                        throw new IgniteCheckedException(e);
-                    }
+                    rbuf = new SegmentedRingByteBuffer(buf, metrics);
                 }
                 else
                     rbuf = new SegmentedRingByteBuffer(dsCfg.getWalBufferSize(), maxWalSegmentSize, DIRECT, metrics);
@@ -1205,13 +1195,21 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                 return hnd;
             }
             catch (IgniteCheckedException | IOException e) {
-                fileIO.close();
+                try {
+                    fileIO.close();
+                }
+                catch (IOException suppressed) {
+                    e.addSuppressed(suppressed);
+                }
 
-                throw e;
+                if (e instanceof StorageException)
+                    throw (StorageException) e;
+
+                throw e instanceof IOException ? (IOException) e : new IOException(e);
             }
         }
         catch (IOException e) {
-            throw new IgniteCheckedException("Failed to restore WAL write handle: " + curFile.getAbsolutePath(), e);
+            throw new StorageException("Failed to restore WAL write handle: " + curFile.getAbsolutePath(), e);
         }
     }
 
@@ -1221,10 +1219,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
      *
      * @param cur Current file write handle released by WAL writer
      * @return Initialized file handle.
-     * @throws StorageException If IO exception occurred.
-     * @throws IgniteCheckedException If failed.
+     * @throws IgniteCheckedException If exception occurred.
      */
-    private FileWriteHandle initNextWriteHandle(FileWriteHandle cur) throws StorageException, IgniteCheckedException {
+    private FileWriteHandle initNextWriteHandle(FileWriteHandle cur) throws IgniteCheckedException {
+        IgniteCheckedException error = null;
+
         try {
             File nextFile = pollNextFile(cur.idx);
 
@@ -1298,19 +1297,24 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
             return hnd;
         }
+        catch (IgniteCheckedException e) {
+            throw error = e;
+        }
         catch (IOException e) {
-            StorageException se = new StorageException("Unable to initialize WAL segment", e);
-
-            cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, se));
-
-            throw se;
+            throw error = new StorageException("Unable to initialize WAL segment", e);
+        }
+        finally {
+            if (error != null)
+                cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, error));
         }
     }
 
     /**
      * Deletes temp files, creates and prepares new; Creates first segment if necessary
+     *
+     * @throws StorageException If failed.
      */
-    private void checkOrPrepareFiles() throws IgniteCheckedException {
+    private void checkOrPrepareFiles() throws StorageException {
         // Clean temp files.
         {
             File[] tmpFiles = walWorkDir.listFiles(WAL_SEGMENT_TEMP_FILE_FILTER);
@@ -1320,7 +1324,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                     boolean deleted = tmp.delete();
 
                     if (!deleted)
-                        throw new IgniteCheckedException("Failed to delete previously created temp file " +
+                        throw new StorageException("Failed to delete previously created temp file " +
                             "(make sure Ignite process has enough rights): " + tmp.getAbsolutePath());
                 }
             }
@@ -1330,7 +1334,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
         if(isArchiverEnabled())
             if (allFiles.length != 0 && allFiles.length > dsCfg.getWalSegments())
-                throw new IgniteCheckedException("Failed to initialize wal (work directory contains " +
+                throw new StorageException("Failed to initialize wal (work directory contains " +
                     "incorrect number of segments) [cur=" + allFiles.length + ", expected=" + dsCfg.getWalSegments() + ']');
 
         // Allocate the first segment synchronously. All other segments will be allocated by archiver in background.
@@ -1370,9 +1374,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
      * Clears whole the file, fills with zeros for Default mode.
      *
      * @param file File to format.
-     * @throws IgniteCheckedException if formatting failed
+     * @throws StorageException if formatting failed
      */
-    private void formatFile(File file) throws IgniteCheckedException {
+    private void formatFile(File file) throws StorageException {
         formatFile(file, dsCfg.getWalSegmentSize());
     }
 
@@ -1381,9 +1385,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
      *
      * @param file File to format.
      * @param bytesCntToFormat Count of first bytes to format.
-     * @throws IgniteCheckedException if formatting failed
+     * @throws StorageException if formatting failed
      */
-    private void formatFile(File file, int bytesCntToFormat) throws IgniteCheckedException {
+    private void formatFile(File file, int bytesCntToFormat) throws StorageException {
         if (log.isDebugEnabled())
             log.debug("Formatting file [exists=" + file.exists() + ", file=" + file.getAbsolutePath() + ']');
 
@@ -1395,7 +1399,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                     int toWrite = Math.min(FILL_BUF.length, left);
 
                     if (fileIO.write(FILL_BUF, 0, toWrite) < toWrite) {
-                        final IgniteCheckedException ex = new IgniteCheckedException("Failed to extend WAL segment file: " +
+                        final StorageException ex = new StorageException("Failed to extend WAL segment file: " +
                             file.getName() + ". Probably disk is too busy, please check your device.");
 
                         if (failureProcessor != null)
@@ -1413,7 +1417,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                 fileIO.clear();
         }
         catch (IOException e) {
-            throw new IgniteCheckedException("Failed to format WAL segment file: " + file.getAbsolutePath(), e);
+            throw new StorageException("Failed to format WAL segment file: " + file.getAbsolutePath(), e);
         }
     }
 
@@ -1421,9 +1425,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
      * Creates a file atomically with temp file.
      *
      * @param file File to create.
-     * @throws IgniteCheckedException If failed.
+     * @throws StorageException If failed.
      */
-    private void createFile(File file) throws IgniteCheckedException {
+    private void createFile(File file) throws StorageException {
         if (log.isDebugEnabled())
             log.debug("Creating new file [exists=" + file.exists() + ", file=" + file.getAbsolutePath() + ']');
 
@@ -1435,7 +1439,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             Files.move(tmp.toPath(), file.toPath());
         }
         catch (IOException e) {
-            throw new IgniteCheckedException("Failed to move temp file to a regular WAL segment file: " +
+            throw new StorageException("Failed to move temp file to a regular WAL segment file: " +
                 file.getAbsolutePath(), e);
         }
 
@@ -1448,9 +1452,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
      *
      * @param curIdx Current absolute WAL segment index.
      * @return File ready for use as new WAL segment.
-     * @throws IgniteCheckedException If failed.
+     * @throws StorageException If exception occurred in the archiver thread.
      */
-    private File pollNextFile(long curIdx) throws IgniteCheckedException {
+    private File pollNextFile(long curIdx) throws StorageException {
         FileArchiver archiver0 = archiver;
 
         if (archiver0 == null) {
@@ -1526,7 +1530,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
      */
     private class FileArchiver extends GridWorker {
         /** Exception which occurred during initial creation of files or during archiving WAL segment */
-        private IgniteCheckedException cleanErr;
+        private StorageException cleanErr;
 
         /**
          * Absolute current segment index WAL Manager writes to. Guarded by <code>this</code>. Incremented during
@@ -1598,15 +1602,17 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             try {
                 allocateRemainingFiles();
             }
-            catch (IgniteCheckedException e) {
+            catch (StorageException e) {
                 synchronized (this) {
                     // Stop the thread and report to starter.
                     cleanErr = e;
 
                     notifyAll();
-
-                    return;
                 }
+
+                cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, e));
+
+                return;
             }
 
             Throwable err = null;
@@ -1690,9 +1696,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          *
          * @param curIdx Current absolute index that we want to increment.
          * @return Next index (curWalSegmIdx+1) when it is ready to be written.
-         * @throws IgniteCheckedException If failed (if interrupted or if exception occurred in the archiver thread).
+         * @throws StorageException If exception occurred in the archiver thread.
          */
-        private long nextAbsoluteSegmentIndex(long curIdx) throws IgniteCheckedException {
+        private long nextAbsoluteSegmentIndex(long curIdx) throws StorageException {
             synchronized (this) {
                 if (cleanErr != null)
                     throw cleanErr;
@@ -1707,6 +1713,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                 while (curAbsWalIdx - lastAbsArchivedIdx > dsCfg.getWalSegments() && cleanErr == null) {
                     try {
                         wait();
+
+                        if (cleanErr != null)
+                            throw cleanErr;
                     }
                     catch (InterruptedException ignore) {
                         interrupted.set(true);
@@ -1714,9 +1723,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                 }
 
                 // Wait for formatter so that we do not open an empty file in DEFAULT mode.
-                while (curAbsWalIdx % dsCfg.getWalSegments() > formatted)
+                while (curAbsWalIdx % dsCfg.getWalSegments() > formatted && cleanErr == null)
                     try {
                         wait();
+
+                        if (cleanErr != null)
+                            throw cleanErr;
                     }
                     catch (InterruptedException ignore) {
                         interrupted.set(true);
@@ -1788,7 +1800,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          *
          * @param absIdx Absolute index to archive.
          */
-        private SegmentArchiveResult archiveSegment(long absIdx) throws IgniteCheckedException {
+        private SegmentArchiveResult archiveSegment(long absIdx) throws StorageException {
             long segIdx = absIdx % dsCfg.getWalSegments();
 
             File origFile = new File(walWorkDir, FileDescriptor.fileName(segIdx));
@@ -1817,7 +1829,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                 }
             }
             catch (IOException e) {
-                throw new IgniteCheckedException("Failed to archive WAL segment [" +
+                throw new StorageException("Failed to archive WAL segment [" +
                     "srcFile=" + origFile.getAbsolutePath() +
                     ", dstFile=" + dstTmpFile.getAbsolutePath() + ']', e);
             }
@@ -1840,7 +1852,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          * Background creation of all segments except first. First segment was created in main thread by {@link
          * FileWriteAheadLogManager#checkOrPrepareFiles()}
          */
-        private void allocateRemainingFiles() throws IgniteCheckedException {
+        private void allocateRemainingFiles() throws StorageException {
             checkFiles(
                 1,
                 true,
@@ -2234,23 +2246,23 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
      * @param startWith Start with.
      * @param create Flag create file.
      * @param p Predicate Exit condition.
-     * @throws IgniteCheckedException if validation or create file fail.
+     * @throws StorageException if validation or create file fail.
      */
     private void checkFiles(
         int startWith,
         boolean create,
         @Nullable IgnitePredicate<Integer> p,
         @Nullable IgniteInClosure<Integer> completionCallback
-    ) throws IgniteCheckedException {
+    ) throws StorageException {
         for (int i = startWith; i < dsCfg.getWalSegments() && (p == null || p.apply(i)); i++) {
             File checkFile = new File(walWorkDir, FileDescriptor.fileName(i));
 
             if (checkFile.exists()) {
                 if (checkFile.isDirectory())
-                    throw new IgniteCheckedException("Failed to initialize WAL log segment (a directory with " +
+                    throw new StorageException("Failed to initialize WAL log segment (a directory with " +
                         "the same name already exists): " + checkFile.getAbsolutePath());
                 else if (checkFile.length() != dsCfg.getWalSegmentSize() && mode == WALMode.FSYNC)
-                    throw new IgniteCheckedException("Failed to initialize WAL log segment " +
+                    throw new StorageException("Failed to initialize WAL log segment " +
                         "(WAL segment size change is not supported in 'DEFAULT' WAL mode) " +
                         "[filePath=" + checkFile.getAbsolutePath() +
                         ", fileSize=" + checkFile.length() +
@@ -2650,9 +2662,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          * Flush or wait for concurrent flush completion.
          *
          * @param ptr Pointer.
-         * @throws IgniteCheckedException If failed.
          */
-        private void flushOrWait(FileWALPointer ptr) throws IgniteCheckedException {
+        private void flushOrWait(FileWALPointer ptr) {
             if (ptr != null) {
                 // If requested obsolete file index, it must be already flushed by close.
                 if (ptr.index() != idx)
@@ -2664,10 +2675,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
         /**
          * @param ptr Pointer.
-         * @throws IgniteCheckedException If failed.
-         * @throws StorageException If failed.
          */
-        private void flush(FileWALPointer ptr) throws IgniteCheckedException, StorageException {
+        private void flush(FileWALPointer ptr) {
             if (ptr == null) { // Unconditional flush.
                 walWriter.flushAll();
 
@@ -2883,7 +2892,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                         }
                     }
                     catch (IOException e) {
-                        throw new IgniteCheckedException(e);
+                        throw new StorageException("Failed to close WAL write handle [idx=" + idx + "]", e);
                     }
 
                     if (log.isDebugEnabled())
@@ -3364,28 +3373,29 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         /**
          * Forces all made changes to the file.
          */
-        void force() throws IgniteCheckedException {
+        void force() {
             flushBuffer(FILE_FORCE);
         }
 
         /**
          * Closes file.
          */
-        void close() throws IgniteCheckedException {
+        void close() {
             flushBuffer(FILE_CLOSE);
         }
 
         /**
          * Flushes all data from the buffer.
          */
-        void flushAll() throws IgniteCheckedException {
+        void flushAll() {
             flushBuffer(UNCONDITIONAL_FLUSH);
         }
 
         /**
          * @param expPos Expected position.
          */
-        void flushBuffer(long expPos) throws StorageException, IgniteCheckedException {
+        @SuppressWarnings("ForLoopReplaceableByForEach")
+        void flushBuffer(long expPos) {
             if (mmap)
                 return;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3fff8a85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
index 49fbc73..6e59ad3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
@@ -487,37 +487,32 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
 
     /** {@inheritDoc} */
     @Override public void resumeLogging(WALPointer lastPtr) throws IgniteCheckedException {
-        try {
-            assert currentHnd == null;
-            assert lastPtr == null || lastPtr instanceof FileWALPointer;
-
-            FileWALPointer filePtr = (FileWALPointer)lastPtr;
+        assert currentHnd == null;
+        assert lastPtr == null || lastPtr instanceof FileWALPointer;
 
-            currentHnd = restoreWriteHandle(filePtr);
-
-            if (currentHnd.serializer.version() != serializer.version()) {
-                if (log.isInfoEnabled())
-                    log.info("Record serializer version change detected, will start logging with a new WAL record " +
-                        "serializer to a new WAL segment [curFile=" + currentHnd + ", newVer=" + serializer.version() +
-                        ", oldVer=" + currentHnd.serializer.version() + ']');
+        FileWALPointer filePtr = (FileWALPointer)lastPtr;
 
-                rollOver(currentHnd);
-            }
+        currentHnd = restoreWriteHandle(filePtr);
 
-            if (mode == WALMode.BACKGROUND) {
-                backgroundFlushSchedule = cctx.time().schedule(new Runnable() {
-                    @Override public void run() {
-                        doFlush();
-                    }
-                }, flushFreq, flushFreq);
-            }
+        if (currentHnd.serializer.version() != serializer.version()) {
+            if (log.isInfoEnabled())
+                log.info("Record serializer version change detected, will start logging with a new WAL record " +
+                    "serializer to a new WAL segment [curFile=" + currentHnd + ", newVer=" + serializer.version() +
+                    ", oldVer=" + currentHnd.serializer.version() + ']');
 
-            if (walAutoArchiveAfterInactivity > 0)
-                scheduleNextInactivityPeriodElapsedCheck();
+            rollOver(currentHnd);
         }
-        catch (StorageException e) {
-            throw new IgniteCheckedException(e);
+
+        if (mode == WALMode.BACKGROUND) {
+            backgroundFlushSchedule = cctx.time().schedule(new Runnable() {
+                @Override public void run() {
+                    doFlush();
+                }
+            }, flushFreq, flushFreq);
         }
+
+        if (walAutoArchiveAfterInactivity > 0)
+            scheduleNextInactivityPeriodElapsedCheck();
     }
 
     /**
@@ -1019,7 +1014,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
      * @param cur Handle that failed to fit the given entry.
      * @return Handle that will fit the entry.
      */
-    private FileWriteHandle rollOver(FileWriteHandle cur) throws StorageException, IgniteCheckedException {
+    private FileWriteHandle rollOver(FileWriteHandle cur) throws IgniteCheckedException {
         FileWriteHandle hnd = currentHandle();
 
         if (hnd != cur)
@@ -1050,9 +1045,9 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
     /**
      * @param lastReadPtr Last read WAL file pointer.
      * @return Initialized file write handle.
-     * @throws IgniteCheckedException If failed to initialize WAL write handle.
+     * @throws StorageException If failed to initialize WAL write handle.
      */
-    private FileWriteHandle restoreWriteHandle(FileWALPointer lastReadPtr) throws IgniteCheckedException {
+    private FileWriteHandle restoreWriteHandle(FileWALPointer lastReadPtr) throws StorageException {
         long absIdx = lastReadPtr == null ? 0 : lastReadPtr.index();
 
         long segNo = absIdx % dsCfg.getWalSegments();
@@ -1100,13 +1095,21 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
                 return hnd;
             }
             catch (IgniteCheckedException | IOException e) {
-                fileIO.close();
+                try {
+                    fileIO.close();
+                }
+                catch (IOException suppressed) {
+                    e.addSuppressed(suppressed);
+                }
 
-                throw e;
+                if (e instanceof StorageException)
+                    throw (StorageException) e;
+
+                throw e instanceof IOException ? (IOException) e : new IOException(e);
             }
         }
         catch (IOException e) {
-            throw new IgniteCheckedException("Failed to restore WAL write handle: " + curFile.getAbsolutePath(), e);
+            throw new StorageException("Failed to restore WAL write handle: " + curFile.getAbsolutePath(), e);
         }
     }
 
@@ -1117,10 +1120,11 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
      *
      * @param curIdx current absolute segment released by WAL writer
      * @return Initialized file handle.
-     * @throws StorageException If IO exception occurred.
-     * @throws IgniteCheckedException If failed.
+     * @throws IgniteCheckedException If exception occurred.
      */
-    private FileWriteHandle initNextWriteHandle(long curIdx) throws StorageException, IgniteCheckedException {
+    private FileWriteHandle initNextWriteHandle(long curIdx) throws IgniteCheckedException {
+        IgniteCheckedException error = null;
+
         try {
             File nextFile = pollNextFile(curIdx);
 
@@ -1140,19 +1144,24 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
 
             return hnd;
         }
+        catch (IgniteCheckedException e) {
+            throw error = e;
+        }
         catch (IOException e) {
-            StorageException se = new StorageException("Unable to initialize WAL segment", e);
-
-            cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, se));
-
-            throw se;
+            throw error = new StorageException("Unable to initialize WAL segment", e);
+        }
+        finally {
+            if (error != null)
+                cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, error));
         }
     }
 
     /**
-     * Deletes temp files, creates and prepares new; Creates first segment if necessary
+     * Deletes temp files, creates and prepares new; Creates first segment if necessary.
+     *
+     * @throws StorageException If failed.
      */
-    private void checkOrPrepareFiles() throws IgniteCheckedException {
+    private void checkOrPrepareFiles() throws StorageException {
         // Clean temp files.
         {
             File[] tmpFiles = walWorkDir.listFiles(WAL_SEGMENT_TEMP_FILE_FILTER);
@@ -1162,7 +1171,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
                     boolean deleted = tmp.delete();
 
                     if (!deleted)
-                        throw new IgniteCheckedException("Failed to delete previously created temp file " +
+                        throw new StorageException("Failed to delete previously created temp file " +
                             "(make sure Ignite process has enough rights): " + tmp.getAbsolutePath());
                 }
             }
@@ -1171,7 +1180,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
         File[] allFiles = walWorkDir.listFiles(WAL_SEGMENT_FILE_FILTER);
 
         if (allFiles.length != 0 && allFiles.length > dsCfg.getWalSegments())
-            throw new IgniteCheckedException("Failed to initialize wal (work directory contains " +
+            throw new StorageException("Failed to initialize wal (work directory contains " +
                 "incorrect number of segments) [cur=" + allFiles.length + ", expected=" + dsCfg.getWalSegments() + ']');
 
         // Allocate the first segment synchronously. All other segments will be allocated by archiver in background.
@@ -1188,9 +1197,9 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
      * Clears whole the file, fills with zeros for Default mode.
      *
      * @param file File to format.
-     * @throws IgniteCheckedException if formatting failed
+     * @throws StorageException if formatting failed.
      */
-    private void formatFile(File file) throws IgniteCheckedException {
+    private void formatFile(File file) throws StorageException {
         formatFile(file, dsCfg.getWalSegmentSize());
     }
 
@@ -1199,9 +1208,9 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
      *
      * @param file File to format.
      * @param bytesCntToFormat Count of first bytes to format.
-     * @throws IgniteCheckedException if formatting failed
+     * @throws StorageException If formatting failed.
      */
-    private void formatFile(File file, int bytesCntToFormat) throws IgniteCheckedException {
+    private void formatFile(File file, int bytesCntToFormat) throws StorageException {
         if (log.isDebugEnabled())
             log.debug("Formatting file [exists=" + file.exists() + ", file=" + file.getAbsolutePath() + ']');
 
@@ -1223,7 +1232,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
                 fileIO.clear();
         }
         catch (IOException e) {
-            throw new IgniteCheckedException("Failed to format WAL segment file: " + file.getAbsolutePath(), e);
+            throw new StorageException("Failed to format WAL segment file: " + file.getAbsolutePath(), e);
         }
     }
 
@@ -1231,9 +1240,9 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
      * Creates a file atomically with temp file.
      *
      * @param file File to create.
-     * @throws IgniteCheckedException If failed.
+     * @throws StorageException If failed.
      */
-    private void createFile(File file) throws IgniteCheckedException {
+    private void createFile(File file) throws StorageException {
         if (log.isDebugEnabled())
             log.debug("Creating new file [exists=" + file.exists() + ", file=" + file.getAbsolutePath() + ']');
 
@@ -1245,7 +1254,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
             Files.move(tmp.toPath(), file.toPath());
         }
         catch (IOException e) {
-            throw new IgniteCheckedException("Failed to move temp file to a regular WAL segment file: " +
+            throw new StorageException("Failed to move temp file to a regular WAL segment file: " +
                 file.getAbsolutePath(), e);
         }
 
@@ -1259,9 +1268,10 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
      *
      * @param curIdx Current absolute WAL segment index.
      * @return File ready for use as new WAL segment.
-     * @throws IgniteCheckedException If failed.
+     * @throws StorageException If exception occurred in the archiver thread.
+     * @throws IgniteInterruptedCheckedException If interrupted.
      */
-    private File pollNextFile(long curIdx) throws IgniteCheckedException {
+    private File pollNextFile(long curIdx) throws StorageException, IgniteInterruptedCheckedException {
         // Signal to archiver that we are done with the segment and it can be archived.
         long absNextIdx = archiver.nextAbsoluteSegmentIndex(curIdx);
 
@@ -1318,7 +1328,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
      */
     private class FileArchiver extends GridWorker {
         /** Exception which occurred during initial creation of files or during archiving WAL segment */
-        private IgniteCheckedException cleanException;
+        private StorageException cleanException;
 
         /**
          * Absolute current segment index WAL Manager writes to. Guarded by <code>this</code>.
@@ -1426,15 +1436,17 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
             try {
                 allocateRemainingFiles();
             }
-            catch (IgniteCheckedException e) {
+            catch (StorageException e) {
                 synchronized (this) {
                     // Stop the thread and report to starter.
                     cleanException = e;
 
                     notifyAll();
-
-                    return;
                 }
+
+                cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, e));
+
+                return;
             }
 
             Throwable err = null;
@@ -1515,9 +1527,10 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
          *
          * @param curIdx Current absolute index that we want to increment.
          * @return Next index (curWalSegmIdx+1) when it is ready to be written.
-         * @throws IgniteCheckedException If failed (if interrupted or if exception occurred in the archiver thread).
+         * @throws StorageException If exception occurred in the archiver thread.
+         * @throws IgniteInterruptedCheckedException If interrupted.
          */
-        private long nextAbsoluteSegmentIndex(long curIdx) throws IgniteCheckedException {
+        private long nextAbsoluteSegmentIndex(long curIdx) throws StorageException, IgniteInterruptedCheckedException {
             try {
                 synchronized (this) {
                     if (cleanException != null)
@@ -1535,10 +1548,16 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
                     while ((curAbsWalIdx - lastAbsArchivedIdx > segments && cleanException == null))
                         wait();
 
+                    if (cleanException != null)
+                        throw cleanException;
+
                     // Wait for formatter so that we do not open an empty file in DEFAULT mode.
-                    while (curAbsWalIdx % dsCfg.getWalSegments() > formatted)
+                    while (curAbsWalIdx % dsCfg.getWalSegments() > formatted && cleanException == null)
                         wait();
 
+                    if (cleanException != null)
+                        throw cleanException;
+
                     return curAbsWalIdx;
                 }
             }
@@ -1664,7 +1683,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
          * Background creation of all segments except first. First segment was created in main thread by
          * {@link FsyncModeFileWriteAheadLogManager#checkOrPrepareFiles()}
          */
-        private void allocateRemainingFiles() throws IgniteCheckedException {
+        private void allocateRemainingFiles() throws StorageException {
             final FileArchiver archiver = this;
 
             checkFiles(1,
@@ -2029,23 +2048,23 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
      * @param startWith Start with.
      * @param create Flag create file.
      * @param p Predicate Exit condition.
-     * @throws IgniteCheckedException if validation or create file fail.
+     * @throws StorageException if validation or create file fail.
      */
     private void checkFiles(
         int startWith,
         boolean create,
         @Nullable IgnitePredicate<Integer> p,
         @Nullable IgniteInClosure<Integer> completionCallback
-    ) throws IgniteCheckedException {
+    ) throws StorageException {
         for (int i = startWith; i < dsCfg.getWalSegments() && (p == null || (p != null && p.apply(i))); i++) {
             File checkFile = new File(walWorkDir, FileDescriptor.fileName(i));
 
             if (checkFile.exists()) {
                 if (checkFile.isDirectory())
-                    throw new IgniteCheckedException("Failed to initialize WAL log segment (a directory with " +
+                    throw new StorageException("Failed to initialize WAL log segment (a directory with " +
                         "the same name already exists): " + checkFile.getAbsolutePath());
                 else if (checkFile.length() != dsCfg.getWalSegmentSize() && mode == WALMode.FSYNC)
-                    throw new IgniteCheckedException("Failed to initialize WAL log segment " +
+                    throw new StorageException("Failed to initialize WAL log segment " +
                         "(WAL segment size change is not supported):" + checkFile.getAbsolutePath());
             }
             else if (create)
@@ -2410,7 +2429,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
          *
          * @throws IOException If fail to write serializer version.
          */
-        public void writeSerializerVersion() throws IOException {
+        private void writeSerializerVersion() throws IOException {
             try {
                 assert fileIO.position() == 0 : "Serializer version can be written only at the begin of file " +
                     fileIO.position();
@@ -2448,9 +2467,8 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
          * @param rec Record to be added to record chain as new {@link #head}
          * @return Pointer or null if roll over to next segment is required or already started by other thread.
          * @throws StorageException If failed.
-         * @throws IgniteCheckedException If failed.
          */
-        @Nullable private WALPointer addRecord(WALRecord rec) throws StorageException, IgniteCheckedException {
+        @Nullable private WALPointer addRecord(WALRecord rec) throws StorageException {
             assert rec.size() > 0 || rec.getClass() == FakeRecord.class;
 
             boolean flushed = false;
@@ -2503,9 +2521,9 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
          * Flush or wait for concurrent flush completion.
          *
          * @param ptr Pointer.
-         * @throws IgniteCheckedException If failed.
+         * @throws StorageException If failed.
          */
-        private void flushOrWait(FileWALPointer ptr, boolean stop) throws IgniteCheckedException {
+        private void flushOrWait(FileWALPointer ptr, boolean stop) throws StorageException {
             long expWritten;
 
             if (ptr != null) {
@@ -2549,10 +2567,9 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
         /**
          * @param ptr Pointer.
          * @return {@code true} If the flush really happened.
-         * @throws IgniteCheckedException If failed.
          * @throws StorageException If failed.
          */
-        private boolean flush(FileWALPointer ptr, boolean stop) throws IgniteCheckedException, StorageException {
+        private boolean flush(FileWALPointer ptr, boolean stop) throws StorageException {
             if (ptr == null) { // Unconditional flush.
                 for (; ; ) {
                     WALRecord expHead = head.get();
@@ -2594,10 +2611,9 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
 
         /**
          * @param expHead Expected head of chain. If head was changed, flush is not performed in this thread
-         * @throws IgniteCheckedException If failed.
          * @throws StorageException If failed.
          */
-        private boolean flush(WALRecord expHead, boolean stop) throws StorageException, IgniteCheckedException {
+        private boolean flush(WALRecord expHead, boolean stop) throws StorageException {
             if (expHead.previous() == null) {
                 FakeRecord frHead = (FakeRecord)expHead;
 
@@ -2643,7 +2659,8 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
                 return true;
             }
             catch (Throwable e) {
-                StorageException se = new StorageException("Unable to write", new IOException(e));
+                StorageException se = e instanceof StorageException ? (StorageException) e :
+                    new StorageException("Unable to write", new IOException(e));
 
                 cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, se));
 
@@ -2725,8 +2742,9 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
         /**
          * @param ptr Pointer to sync.
          * @throws StorageException If failed.
+         * @throws IgniteInterruptedCheckedException If interrupted.
          */
-        private void fsync(FileWALPointer ptr, boolean stop) throws StorageException, IgniteCheckedException {
+        private void fsync(FileWALPointer ptr, boolean stop) throws StorageException, IgniteInterruptedCheckedException {
             lock.lock();
 
             try {
@@ -2780,10 +2798,9 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
 
         /**
          * @return {@code true} If this thread actually closed the segment.
-         * @throws IgniteCheckedException If failed.
          * @throws StorageException If failed.
          */
-        private boolean close(boolean rollOver) throws IgniteCheckedException, StorageException {
+        private boolean close(boolean rollOver) throws StorageException {
             if (stop.compareAndSet(false, true)) {
                 lock.lock();
 
@@ -2793,43 +2810,49 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
                     assert stopped() : "Segment is not closed after close flush: " + head.get();
 
                     try {
-                        RecordSerializer backwardSerializer = new RecordSerializerFactoryImpl(cctx)
-                            .createSerializer(serializerVersion);
+                        try {
+                            RecordSerializer backwardSerializer = new RecordSerializerFactoryImpl(cctx)
+                                .createSerializer(serializerVersion);
 
-                        SwitchSegmentRecord segmentRecord = new SwitchSegmentRecord();
+                            SwitchSegmentRecord segmentRecord = new SwitchSegmentRecord();
 
-                        int switchSegmentRecSize = backwardSerializer.size(segmentRecord);
+                            int switchSegmentRecSize = backwardSerializer.size(segmentRecord);
 
-                        if (rollOver && written < (maxSegmentSize - switchSegmentRecSize)) {
-                            final ByteBuffer buf = ByteBuffer.allocate(switchSegmentRecSize);
+                            if (rollOver && written < (maxSegmentSize - switchSegmentRecSize)) {
+                                final ByteBuffer buf = ByteBuffer.allocate(switchSegmentRecSize);
 
-                            segmentRecord.position(new FileWALPointer(idx, (int)written, switchSegmentRecSize));
-                            backwardSerializer.writeRecord(segmentRecord, buf);
+                                segmentRecord.position(new FileWALPointer(idx, (int)written, switchSegmentRecSize));
+                                backwardSerializer.writeRecord(segmentRecord, buf);
 
-                            buf.rewind();
+                                buf.rewind();
 
-                            int rem = buf.remaining();
+                                int rem = buf.remaining();
 
-                            while (rem > 0) {
-                                int written0 = fileIO.write(buf, written);
+                                while (rem > 0) {
+                                    int written0 = fileIO.write(buf, written);
 
-                                written += written0;
+                                    written += written0;
 
-                                rem -= written0;
+                                    rem -= written0;
+                                }
                             }
                         }
+                        catch (IgniteCheckedException e) {
+                            throw new IOException(e);
+                        }
+                        finally {
+                            assert mode == WALMode.FSYNC;
 
-                        // Do the final fsync.
-                        if (mode == WALMode.FSYNC) {
+                            // Do the final fsync.
                             fileIO.force();
 
                             lastFsyncPos = written;
-                        }
 
-                        fileIO.close();
+                            fileIO.close();
+                        }
                     }
                     catch (IOException e) {
-                        throw new IgniteCheckedException(e);
+                        throw new StorageException("Failed to close WAL write handle [idx=" + idx + "]", e);
                     }
 
                     if (log.isDebugEnabled())
@@ -2860,9 +2883,17 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
 
                     assert written == lastFsyncPos || mode != WALMode.FSYNC :
                     "fsync [written=" + written + ", lastFsync=" + lastFsyncPos + ']';
-                }
 
-                fileIO = null;
+                    fileIO = null;
+                }
+                else {
+                    try {
+                        fileIO.close();
+                    }
+                    catch (IOException e) {
+                        U.error(log, "Failed to close WAL file [idx=" + idx + ", fileIO=" + fileIO + "]", e);
+                    }
+                }
 
                 nextSegment.signalAll();
             }
@@ -2872,9 +2903,9 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
         }
 
         /**
-         * @throws IgniteCheckedException If failed.
+         *
          */
-        private void awaitNext() throws IgniteCheckedException {
+        private void awaitNext() {
             lock.lock();
 
             try {
@@ -2894,7 +2925,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
          * @throws IgniteCheckedException If failed.
          */
         @SuppressWarnings("TooBroadScope")
-        private void writeBuffer(long pos, ByteBuffer buf) throws StorageException, IgniteCheckedException {
+        private void writeBuffer(long pos, ByteBuffer buf) throws StorageException {
             boolean interrupted = false;
 
             lock.lock();

http://git-wip-us.apache.org/repos/asf/ignite/blob/3fff8a85/modules/core/src/test/java/org/apache/ignite/failure/TestFailureHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/failure/TestFailureHandler.java b/modules/core/src/test/java/org/apache/ignite/failure/TestFailureHandler.java
index 1159683..545c9ea 100644
--- a/modules/core/src/test/java/org/apache/ignite/failure/TestFailureHandler.java
+++ b/modules/core/src/test/java/org/apache/ignite/failure/TestFailureHandler.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.failure;
 
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import org.apache.ignite.Ignite;
 
 /**
@@ -35,6 +36,13 @@ public class TestFailureHandler implements FailureHandler {
 
     /**
      * @param invalidate Invalidate.
+     */
+    public TestFailureHandler(boolean invalidate) {
+        this(invalidate, new CountDownLatch(1));
+    }
+
+    /**
+     * @param invalidate Invalidate.
      * @param latch Latch.
      */
     public TestFailureHandler(boolean invalidate, CountDownLatch latch) {
@@ -60,4 +68,15 @@ public class TestFailureHandler implements FailureHandler {
     public FailureContext failureContext() {
         return failureCtx;
     }
+
+    /**
+     * @param millis Millis.
+
+     * @return Failure context.
+     */
+    public FailureContext awaitFailure(long millis) throws InterruptedException {
+        latch.await(millis, TimeUnit.MILLISECONDS);
+
+        return failureCtx;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3fff8a85/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFormatFileFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFormatFileFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFormatFileFailoverTest.java
new file mode 100644
index 0000000..379b8c3
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFormatFileFailoverTest.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db.wal;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.OpenOption;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.failure.FailureHandler;
+import org.apache.ignite.failure.TestFailureHandler;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.StorageException;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.READ;
+import static java.nio.file.StandardOpenOption.WRITE;
+
+/**
+ *
+ */
+public class IgniteWalFormatFileFailoverTest extends GridCommonAbstractTest {
+    /** */
+    private static final String TEST_CACHE = "testCache";
+
+    /** */
+    private static final String formatFile = "formatFile";
+
+    /** Fail method name reference. */
+    private final AtomicReference<String> failMtdNameRef = new AtomicReference<>();
+
+    /** */
+    private boolean fsync;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setCacheConfiguration(new CacheConfiguration(TEST_CACHE)
+            .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
+
+        DataStorageConfiguration memCfg = new DataStorageConfiguration()
+            .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                .setMaxSize(2048L * 1024 * 1024)
+                .setPersistenceEnabled(true))
+            .setWalMode(fsync ? WALMode.FSYNC : WALMode.BACKGROUND)
+            .setWalBufferSize(1024 * 1024)
+            .setWalSegmentSize(512 * 1024)
+            .setFileIOFactory(new FailingFileIOFactory(failMtdNameRef));
+
+        cfg.setDataStorageConfiguration(memCfg);
+
+        cfg.setFailureHandler(new TestFailureHandler(false));
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNodeStartFailedFsync() throws Exception {
+        fsync = true;
+
+        failMtdNameRef.set(formatFile);
+
+        checkCause(GridTestUtils.assertThrows(log, () -> startGrid(0), IgniteCheckedException.class, null));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testFailureHandlerTriggeredFsync() throws Exception {
+        fsync = true;
+
+        failFormatFileOnClusterActivate();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testFailureHandlerTriggered() throws Exception {
+        fsync = false;
+
+        failFormatFileOnClusterActivate();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void failFormatFileOnClusterActivate() throws Exception {
+        failMtdNameRef.set(null);
+
+        startGrid(0);
+        startGrid(1);
+
+        if (!fsync) {
+            setFileIOFactory(grid(0).context().cache().context().wal());
+            setFileIOFactory(grid(1).context().cache().context().wal());
+        }
+
+        failMtdNameRef.set(formatFile);
+
+        grid(0).cluster().active(true);
+
+        checkCause(failureHandler(0).awaitFailure(2000).error());
+        checkCause(failureHandler(1).awaitFailure(2000).error());
+    }
+
+    /**
+     * @param mtdName Method name.
+     */
+    private static boolean isCalledFrom(String mtdName) {
+        return isCalledFrom(Thread.currentThread().getStackTrace(), mtdName);
+    }
+
+    /**
+     * @param stackTrace Stack trace.
+     * @param mtdName Method name.
+     */
+    private static boolean isCalledFrom(StackTraceElement[] stackTrace, String mtdName) {
+        return Arrays.stream(stackTrace).map(StackTraceElement::getMethodName).anyMatch(mtdName::equals);
+    }
+
+    /**
+     * @param gridIdx Grid index.
+     * @return Failure handler configured for grid with given index.
+     */
+    private TestFailureHandler failureHandler(int gridIdx) {
+        FailureHandler hnd = grid(gridIdx).configuration().getFailureHandler();
+
+        assertTrue(hnd instanceof TestFailureHandler);
+
+        return (TestFailureHandler)hnd;
+    }
+
+    /**
+     * @param t Throwable.
+     */
+    private void checkCause(Throwable t) {
+        StorageException e = X.cause(t, StorageException.class);
+
+        assertNotNull(e);
+        assertNotNull(e.getMessage());
+        assertTrue(e.getMessage().contains("Failed to format WAL segment file"));
+
+        IOException ioe = X.cause(e, IOException.class);
+
+        assertNotNull(ioe);
+        assertNotNull(ioe.getMessage());
+        assertTrue(ioe.getMessage().contains("No space left on device"));
+
+        assertTrue(isCalledFrom(ioe.getStackTrace(), formatFile));
+    }
+
+    /** */
+    private void setFileIOFactory(IgniteWriteAheadLogManager wal) {
+        if (wal instanceof FileWriteAheadLogManager)
+            ((FileWriteAheadLogManager)wal).setFileIOFactory(new FailingFileIOFactory(failMtdNameRef));
+        else
+            fail(wal.getClass().toString());
+    }
+
+    /**
+     * Create File I/O which fails if specific method call present in stack trace.
+     */
+    private static class FailingFileIOFactory implements FileIOFactory {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** Delegate factory. */
+        private final FileIOFactory delegateFactory = new RandomAccessFileIOFactory();
+
+        /** Fail method name reference. */
+        private final AtomicReference<String> failMtdNameRef;
+
+        /**
+         * @param failMtdNameRef Fail method name reference.
+         */
+        FailingFileIOFactory(AtomicReference<String> failMtdNameRef) {
+            assertNotNull(failMtdNameRef);
+
+            this.failMtdNameRef = failMtdNameRef;
+        }
+
+        /** {@inheritDoc} */
+        @Override public FileIO create(File file) throws IOException {
+            return create(file, CREATE, READ, WRITE);
+        }
+
+        /** {@inheritDoc} */
+        @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+            final FileIO delegate = delegateFactory.create(file, modes);
+
+            return new FileIODecorator(delegate) {
+                @Override public int write(byte[] buf, int off, int len) throws IOException {
+                    conditionalFail();
+
+                    return super.write(buf, off, len);
+                }
+
+                @Override public void clear() throws IOException {
+                    conditionalFail();
+
+                    super.clear();
+                }
+
+                private void conditionalFail() throws IOException {
+                    String failMtdName = failMtdNameRef.get();
+
+                    if (failMtdName != null && isCalledFrom(failMtdName))
+                        throw new IOException("No space left on device");
+                }
+            };
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3fff8a85/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index 6a4f9fe..3b46761 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -50,6 +50,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalF
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalFlushFsyncWithMmapBufferSelfTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalFlushLogOnlySelfTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalFlushLogOnlyWithMmapBufferSelfTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalFormatFileFailoverTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalHistoryReservationsTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalIteratorSwitchSegmentTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalSerializerVersionTest;
@@ -150,6 +151,8 @@ public class IgnitePdsTestSuite2 extends TestSuite {
 
         suite.addTestSuite(IgniteWalFlushLogOnlyWithMmapBufferSelfTest.class);
 
+        suite.addTestSuite(IgniteWalFormatFileFailoverTest.class);
+
         // Test suite uses Standalone WAL iterator to verify PDS content.
         suite.addTestSuite(IgniteWalReaderTest.class);
 


Mime
View raw message