ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ira...@apache.org
Subject ignite git commit: IGNITE-8748 All FileIO#write methods should return number of written bytes - Fixes #4170.
Date Fri, 15 Jun 2018 13:56:14 GMT
Repository: ignite
Updated Branches:
  refs/heads/master 312acd019 -> b4b965bbd


IGNITE-8748 All FileIO#write methods should return number of written bytes - Fixes #4170.

Signed-off-by: Ivan Rakov <irakov@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b4b965bb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b4b965bb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b4b965bb

Branch: refs/heads/master
Commit: b4b965bbd9caddee8f28c64689957f32d181eadf
Parents: 312acd0
Author: Stelmak Alexey <alexey.stelmak@corp.mail.ru>
Authored: Fri Jun 15 16:56:07 2018 +0300
Committer: Ivan Rakov <irakov@apache.org>
Committed: Fri Jun 15 16:56:07 2018 +0300

----------------------------------------------------------------------
 .../cache/persistence/file/AsyncFileIO.java     |  4 +--
 .../cache/persistence/file/FileIO.java          |  4 ++-
 .../cache/persistence/file/FileIODecorator.java |  4 +--
 .../persistence/file/RandomAccessFileIO.java    |  4 +--
 .../cache/persistence/file/UnzipFileIO.java     |  2 +-
 .../wal/FileWriteAheadLogManager.java           | 28 +++++++++++++++++---
 ...lWalModeChangeDuringRebalancingSelfTest.java |  4 +--
 .../file/IgnitePdsDiskErrorsRecoveringTest.java |  5 ++--
 .../pagemem/PagesWriteThrottleSmokeTest.java    |  4 +--
 .../file/AlignedBuffersDirectFileIO.java        |  4 +--
 10 files changed, 44 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b4b965bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java
index 799a78c..76142bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java
@@ -154,14 +154,14 @@ public class AsyncFileIO implements FileIO {
     }
 
     /** {@inheritDoc} */
-    @Override public void write(byte[] buf, int off, int len) throws IOException {
+    @Override public int write(byte[] buf, int off, int len) throws IOException {
         ChannelOpFuture fut = holder.get();
         fut.reset();
 
         ch.write(ByteBuffer.wrap(buf, off, len), position, this, fut);
 
         try {
-            fut.getUninterruptibly();
+            return fut.getUninterruptibly();
         }
         catch (IgniteCheckedException e) {
             throw new IOException(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/b4b965bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java
index 822bd66..50568af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java
@@ -120,9 +120,11 @@ public interface FileIO extends AutoCloseable {
      * @param off Start offset in the {@code buffer}.
      * @param len Number of bytes to write.
      *
+     * @return Number of written bytes.
+     * 
      * @throws IOException If some I/O error occurs.
      */
-    public void write(byte[] buf, int off, int len) throws IOException;
+    public int write(byte[] buf, int off, int len) throws IOException;
 
     /**
      * Allocates memory mapped buffer for this file with given size.

http://git-wip-us.apache.org/repos/asf/ignite/blob/b4b965bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java
index 683845b..9c38985 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java
@@ -72,8 +72,8 @@ public class FileIODecorator implements FileIO {
     }
 
     /** {@inheritDoc} */
-    @Override public void write(byte[] buf, int off, int len) throws IOException {
-        delegate.write(buf, off, len);
+    @Override public int write(byte[] buf, int off, int len) throws IOException {
+        return delegate.write(buf, off, len);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/b4b965bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
index 8f7454d..018ed27 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
@@ -79,8 +79,8 @@ public class RandomAccessFileIO implements FileIO {
     }
 
     /** {@inheritDoc} */
-    @Override public void write(byte[] buf, int off, int len) throws IOException {
-        ch.write(ByteBuffer.wrap(buf, off, len));
+    @Override public int write(byte[] buf, int off, int len) throws IOException {
+        return ch.write(ByteBuffer.wrap(buf, off, len));
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/b4b965bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIO.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIO.java
index 469cf3e..8194ba3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIO.java
@@ -110,7 +110,7 @@ public class UnzipFileIO implements FileIO {
     }
 
     /** {@inheritDoc} */
-    @Override public void write(byte[] buf, int off, int len) throws IOException {
+    @Override public int write(byte[] buf, int off, int len) throws IOException {
         throw new UnsupportedOperationException();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b4b965bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index cb80961..a55f156 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -71,6 +71,7 @@ import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.events.EventType;
 import org.apache.ignite.events.WalSegmentArchivedEvent;
 import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -98,6 +99,7 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.Re
 import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory;
 import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
 import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer;
+import org.apache.ignite.internal.processors.failure.FailureProcessor;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.GridUnsafe;
@@ -254,6 +256,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter
impl
     /** Events service */
     private final GridEventStorageManager evt;
 
+    /** Failure processor */
+    private final FailureProcessor failureProcessor;
+
     /** */
     private IgniteConfiguration igCfg;
 
@@ -360,6 +365,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter
impl
         ioFactory = new RandomAccessFileIOFactory();
         walAutoArchiveAfterInactivity = dsCfg.getWalAutoArchiveAfterInactivity();
         evt = ctx.event();
+        failureProcessor = ctx.failure();
     }
 
     /**
@@ -1382,11 +1388,19 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter
impl
         try (FileIO fileIO = ioFactory.create(file, CREATE, READ, WRITE)) {
             int left = bytesCntToFormat;
 
-            if (mode == WALMode.FSYNC) {
+            if (mode == WALMode.FSYNC || mmap) {
                 while (left > 0) {
                     int toWrite = Math.min(FILL_BUF.length, left);
 
-                    fileIO.write(FILL_BUF, 0, toWrite);
+                    if (fileIO.write(FILL_BUF, 0, toWrite) < toWrite) {
+                        final IgniteCheckedException ex = new IgniteCheckedException("Failed
to extend WAL segment file: " +
+                            file.getName() + ". Probably disk is too busy, please check your
device.");
+
+                        if (failureProcessor != null)
+                            failureProcessor.process(new FailureContext(FailureType.CRITICAL_ERROR,
ex));
+
+                        throw ex;
+                    }
 
                     left -= toWrite;
                 }
@@ -2133,7 +2147,15 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter
impl
 
                         int bytesRead;
                         while ((bytesRead = zis.read(arr)) > 0)
-                            io.write(arr, 0, bytesRead);
+                            if (io.write(arr, 0, bytesRead) < bytesRead) {
+                                final IgniteCheckedException ex = new IgniteCheckedException("Failed
to extend file: " +
+                                    unzipTmp.getName() + ". Probably disk is too busy, please
check your device.");
+
+                                if (failureProcessor != null)
+                                    failureProcessor.process(new FailureContext(FailureType.CRITICAL_ERROR,
ex));
+
+                                throw ex;
+                            }
                     }
 
                     try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/b4b965bb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
index aa6bb30..3a89352 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
@@ -604,7 +604,7 @@ public class LocalWalModeChangeDuringRebalancingSelfTest extends GridCommonAbstr
         }
 
         /** {@inheritDoc} */
-        @Override public void write(byte[] buf, int off, int len) throws IOException {
+        @Override public int write(byte[] buf, int off, int len) throws IOException {
             CountDownLatch latch = fileIOLatch.get();
 
             if (latch != null && Thread.currentThread().getName().contains("checkpoint"))
@@ -615,7 +615,7 @@ public class LocalWalModeChangeDuringRebalancingSelfTest extends GridCommonAbstr
                     throw new IgniteException(ex);
                 }
 
-            delegate.write(buf, off, len);
+            return delegate.write(buf, off, len);
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/b4b965bb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java
index 67a2eca..7efe29b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java
@@ -429,11 +429,12 @@ public class IgnitePdsDiskErrorsRecoveringTest extends GridCommonAbstractTest
{
         }
 
         /** {@inheritDoc} */
-        @Override public void write(byte[] buf, int off, int len) throws IOException {
-            super.write(buf, off, len);
+        @Override public int write(byte[] buf, int off, int len) throws IOException {
+            final int num = super.write(buf, off, len);
             availableSpaceBytes.addAndGet(-len);
             if (availableSpaceBytes.get() < 0)
                 throw new IOException("Not enough space!");
+            return num;
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/b4b965bb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java
index f0937e8..7ff1348 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java
@@ -311,11 +311,11 @@ public class PagesWriteThrottleSmokeTest extends GridCommonAbstractTest
{
                     return delegate.write(srcBuf, position);
                 }
 
-                @Override public void write(byte[] buf, int off, int len) throws IOException
{
+                @Override public int write(byte[] buf, int off, int len) throws IOException
{
                     if (slowCheckpointEnabled.get() && Thread.currentThread().getName().contains("checkpoint"))
                         LockSupport.parkNanos(5_000_000);
 
-                    delegate.write(buf, off, len);
+                    return delegate.write(buf, off, len);
                 }
 
                 /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/b4b965bb/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java
----------------------------------------------------------------------
diff --git a/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java
b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java
index 0168d2a..da03a8d 100644
--- a/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java
+++ b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java
@@ -455,8 +455,8 @@ public class AlignedBuffersDirectFileIO implements FileIO {
     }
 
     /** {@inheritDoc} */
-    @Override public void write(byte[] buf, int off, int len) throws IOException {
-        write(ByteBuffer.wrap(buf, off, len));
+    @Override public int write(byte[] buf, int off, int len) throws IOException {
+        return write(ByteBuffer.wrap(buf, off, len));
     }
 
     /** {@inheritDoc} */


Mime
View raw message