Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0D13F200D12 for ; Fri, 22 Sep 2017 13:21:45 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 0ABE71609A7; Fri, 22 Sep 2017 11:21:45 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D92801609E9 for ; Fri, 22 Sep 2017 13:21:42 +0200 (CEST) Received: (qmail 67375 invoked by uid 500); 22 Sep 2017 11:21:42 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 67266 invoked by uid 99); 22 Sep 2017 11:21:42 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 Sep 2017 11:21:42 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EF66EF582F; Fri, 22 Sep 2017 11:21:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Fri, 22 Sep 2017 11:21:44 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [05/35] ignite git commit: IGNITE-6228 Avoid closing page store by thread interruption. Fixes #2715 archived-at: Fri, 22 Sep 2017 11:21:45 -0000 IGNITE-6228 Avoid closing page store by thread interruption. Fixes #2715 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e8379aea Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e8379aea Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e8379aea Branch: refs/heads/ignite-3479 Commit: e8379aeae41fda6fa9fdad187dbbb18f3e92a9cb Parents: f8ae8f9 Author: Alexei Scherbakov Authored: Thu Sep 21 17:40:16 2017 +0300 Committer: Alexey Goncharuk Committed: Thu Sep 21 18:05:45 2017 +0300 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 5 + .../PersistentStoreConfiguration.java | 6 +- .../GridCacheDatabaseSharedManager.java | 7 +- .../cache/persistence/file/AsyncFileIO.java | 218 +++++++++++++++++++ .../persistence/file/AsyncFileIOFactory.java | 52 +++++ .../cache/persistence/file/FileIOFactory.java | 25 ++- .../cache/persistence/file/FilePageStore.java | 5 +- .../file/FileVersionCheckingFactory.java | 2 +- .../persistence/file/RandomAccessFileIO.java | 48 ++-- .../file/RandomAccessFileIOFactory.java | 14 +- .../wal/AbstractWalRecordsIterator.java | 2 +- .../cache/persistence/wal/FileInput.java | 7 + .../wal/FileWriteAheadLogManager.java | 8 +- .../reader/StandaloneWalRecordsIterator.java | 4 +- .../internal/util/future/GridFutureAdapter.java | 16 ++ .../resources/META-INF/classnames.properties | 2 + .../file/IgnitePdsThreadInterruptionTest.java | 205 +++++++++++++++++ .../db/wal/IgniteWalFlushFailoverTest.java | 22 +- .../db/wal/crc/IgniteDataIntegrityTests.java | 39 ++-- .../development/utils/IgniteWalConverter.java | 1 - .../IgnitePdsWithIndexingCoreTestSuite.java | 2 + 21 files changed, 606 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e8379aea/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index ec79026..5f1839b 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -714,6 +714,11 @@ public final class IgniteSystemProperties { "IGNITE_DATA_STREAMING_EXECUTOR_SERVICE_TASKS_STEALING_THRESHOLD"; /** + * If this property is set, then Ignite will use Async File IO factory by default. + */ + public static final String IGNITE_USE_ASYNC_FILE_IO_FACTORY = "IGNITE_USE_ASYNC_FILE_IO_FACTORY"; + + /** * If the property is set {@link org.apache.ignite.internal.pagemem.wal.record.TxRecord} records * will be logged to WAL. * http://git-wip-us.apache.org/repos/asf/ignite/blob/e8379aea/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java index 888bf42..abca5a5 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java @@ -17,6 +17,8 @@ package org.apache.ignite.configuration; import java.io.Serializable; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.internal.processors.cache.persistence.file.AsyncFileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; import org.apache.ignite.internal.util.typedef.internal.S; @@ -137,7 +139,9 @@ public class PersistentStoreConfiguration implements Serializable { private boolean alwaysWriteFullPages = DFLT_WAL_ALWAYS_WRITE_FULL_PAGES; /** Factory to provide I/O interface for files */ - private FileIOFactory fileIOFactory = new RandomAccessFileIOFactory(); + private FileIOFactory fileIOFactory = + IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_USE_ASYNC_FILE_IO_FACTORY, false) ? + new AsyncFileIOFactory() : new RandomAccessFileIOFactory(); /** * Number of sub-intervals the whole {@link #setRateTimeInterval(long)} will be split into to calculate http://git-wip-us.apache.org/repos/asf/ignite/blob/e8379aea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index 5a772b5..5f03d8f 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -143,6 +143,7 @@ import org.apache.ignite.thread.IgniteThreadPoolExecutor; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import static java.nio.file.StandardOpenOption.READ; import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC; import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD; @@ -744,7 +745,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan * @param partFile Partition file. */ private int resolvePageSizeFromPartitionFile(Path partFile) throws IOException, IgniteCheckedException { - try (FileIO fileIO = persistenceCfg.getFileIOFactory().create(partFile.toFile(), "r")) { + try (FileIO fileIO = persistenceCfg.getFileIOFactory().create(partFile.toFile())) { int minimalHdr = FilePageStore.HEADER_SIZE; if (fileIO.size() < minimalHdr) @@ -964,7 +965,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan checkpointLock.readLock().unlock(); try { - checkpointer.wakeupForCheckpoint(0, "too many dirty pages").cpBeginFut.get(); + checkpointer.wakeupForCheckpoint(0, "too many dirty pages").cpBeginFut.getUninterruptibly(); } catch (IgniteCheckedException e) { throw new IgniteException("Failed to wait for checkpoint begin.", e); @@ -1394,7 +1395,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan private WALPointer readPointer(File cpMarkerFile, ByteBuffer buf) throws IgniteCheckedException { buf.position(0); - try (FileChannel ch = FileChannel.open(cpMarkerFile.toPath(), StandardOpenOption.READ)) { + try (FileChannel ch = FileChannel.open(cpMarkerFile.toPath(), READ)) { ch.read(buf); buf.flip(); http://git-wip-us.apache.org/repos/asf/ignite/blob/e8379aea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java new file mode 100644 index 0000000..8fad7a5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.file; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousFileChannel; +import java.nio.channels.CompletionHandler; +import java.nio.file.OpenOption; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.util.GridConcurrentHashSet; +import org.apache.ignite.internal.util.future.GridFutureAdapter; + +/** + * File I/O implementation based on {@link AsynchronousFileChannel}. + */ +public class AsyncFileIO implements FileIO { + /** + * File channel associated with {@code file} + */ + private final AsynchronousFileChannel ch; + + /** + * Channel's position. + */ + private volatile long position; + + /** */ + private final ThreadLocal holder; + + /** */ + private GridConcurrentHashSet asyncFuts = new GridConcurrentHashSet<>(); + + /** + * Creates I/O implementation for specified {@code file} + * @param file Random access file + * @param modes Open modes. + */ + public AsyncFileIO(File file, ThreadLocal holder, OpenOption... modes) throws IOException { + this.ch = AsynchronousFileChannel.open(file.toPath(), modes); + + this.holder = holder; + } + + /** {@inheritDoc} */ + @Override public long position() throws IOException { + return position; + } + + /** {@inheritDoc} */ + @Override public void position(long newPosition) throws IOException { + this.position = newPosition; + } + + /** {@inheritDoc} */ + @Override public int read(ByteBuffer destinationBuffer) throws IOException { + ChannelOpFuture fut = holder.get(); + fut.reset(); + + ch.read(destinationBuffer, position, this, fut); + + try { + return fut.getUninterruptibly(); + } + catch (IgniteCheckedException e) { + throw new IOException(e); + } + } + + /** {@inheritDoc} */ + @Override public int read(ByteBuffer destinationBuffer, long position) throws IOException { + ChannelOpFuture fut = holder.get(); + fut.reset(); + + ch.read(destinationBuffer, position, null, fut); + + try { + return fut.getUninterruptibly(); + } + catch (IgniteCheckedException e) { + throw new IOException(e); + } + finally { + asyncFuts.remove(fut); + } + } + + /** {@inheritDoc} */ + @Override public int read(byte[] buffer, int offset, int length) throws IOException { + ChannelOpFuture fut = holder.get(); + fut.reset(); + + ch.read(ByteBuffer.wrap(buffer, offset, length), position, this, fut); + + try { + return fut.getUninterruptibly(); + } + catch (IgniteCheckedException e) { + throw new IOException(e); + } + } + + /** {@inheritDoc} */ + @Override public int write(ByteBuffer sourceBuffer) throws IOException { + ChannelOpFuture fut = holder.get(); + fut.reset(); + + ch.write(sourceBuffer, position, this, fut); + + try { + return fut.getUninterruptibly(); + } + catch (IgniteCheckedException e) { + throw new IOException(e); + } + } + + /** {@inheritDoc} */ + @Override public int write(ByteBuffer sourceBuffer, long position) throws IOException { + ChannelOpFuture fut = holder.get(); + fut.reset(); + + asyncFuts.add(fut); + + ch.write(sourceBuffer, position, null, fut); + + try { + return fut.getUninterruptibly(); + } + catch (IgniteCheckedException e) { + throw new IOException(e); + } + finally { + asyncFuts.remove(fut); + } + } + + /** {@inheritDoc} */ + @Override public void write(byte[] buffer, int offset, int length) throws IOException { + ChannelOpFuture fut = holder.get(); + fut.reset(); + + ch.write(ByteBuffer.wrap(buffer, offset, length), position, this, fut); + + try { + fut.getUninterruptibly(); + } + catch (IgniteCheckedException e) { + throw new IOException(e); + } + } + + /** {@inheritDoc} */ + @Override public void force() throws IOException { + ch.force(false); + } + + /** {@inheritDoc} */ + @Override public long size() throws IOException { + return ch.size(); + } + + /** {@inheritDoc} */ + @Override public void clear() throws IOException { + ch.truncate(0); + + this.position = 0; + } + + /** {@inheritDoc} */ + @Override public void close() throws IOException { + for (ChannelOpFuture asyncFut : asyncFuts) { + try { + asyncFut.getUninterruptibly(); // Ignore interrupts while waiting for channel close. + } + catch (IgniteCheckedException e) { + throw new IOException(e); + } + } + + ch.close(); + } + + /** */ + static class ChannelOpFuture extends GridFutureAdapter implements CompletionHandler { + /** {@inheritDoc} */ + @Override public void completed(Integer result, AsyncFileIO attachment) { + if (attachment != null) { + if (result != -1) + attachment.position += result; + } + + // Release waiter and allow next operation to begin. + super.onDone(result, null); + } + + /** {@inheritDoc} */ + @Override public void failed(Throwable exc, AsyncFileIO attachment) { + super.onDone(exc); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/e8379aea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIOFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIOFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIOFactory.java new file mode 100644 index 0000000..0fb3052 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIOFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.file; + +import java.io.File; +import java.io.IOException; +import java.nio.channels.AsynchronousFileChannel; +import java.nio.file.OpenOption; + +import static java.nio.file.StandardOpenOption.CREATE; +import static java.nio.file.StandardOpenOption.READ; +import static java.nio.file.StandardOpenOption.WRITE; + +/** + * File I/O factory which uses {@link AsynchronousFileChannel} based implementation of FileIO. + */ +public class AsyncFileIOFactory implements FileIOFactory { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public FileIO create(File file) throws IOException { + return create(file, CREATE, READ, WRITE); + } + + /** */ + private ThreadLocal holder = new ThreadLocal() { + @Override protected AsyncFileIO.ChannelOpFuture initialValue() { + return new AsyncFileIO.ChannelOpFuture(); + } + }; + + /** {@inheritDoc} */ + @Override public FileIO create(File file, OpenOption... modes) throws IOException { + return new AsyncFileIO(file, holder, modes); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/e8379aea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java index 0ffc653..c3a75f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java @@ -20,26 +20,29 @@ package org.apache.ignite.internal.processors.cache.persistence.file; import java.io.File; import java.io.IOException; import java.io.Serializable; +import java.nio.file.OpenOption; +/** + * {@link FileIO} factory definition. + */ public interface FileIOFactory extends Serializable { - /** - * Creates I/O interface for file with default I/O mode + * Creates I/O interface for file with default I/O mode. * - * @param file File - * @return File I/O interface - * @throws IOException If I/O interface creation was failed + * @param file File. + * @return File I/O interface. + * @throws IOException If I/O interface creation was failed. */ - FileIO create(File file) throws IOException; + public FileIO create(File file) throws IOException; /** - * Creates I/O interface for file with specified mode + * Creates I/O interface for file with specified mode. * * @param file File - * @param mode I/O mode in - * @return File I/O interface - * @throws IOException If I/O interface creation was failed + * @param modes Open modes. + * @return File I/O interface. + * @throws IOException If I/O interface creation was failed. */ - FileIO create(File file, String mode) throws IOException; + public FileIO create(File file, OpenOption... modes) throws IOException; } http://git-wip-us.apache.org/repos/asf/ignite/blob/e8379aea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java index 6052a7c..98764a2 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java @@ -35,6 +35,9 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDat import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32; import org.apache.ignite.internal.util.typedef.internal.U; +import static java.nio.file.StandardOpenOption.CREATE; +import static java.nio.file.StandardOpenOption.READ; +import static java.nio.file.StandardOpenOption.WRITE; import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC; /** @@ -400,7 +403,7 @@ public class FilePageStore implements PageStore { IgniteCheckedException err = null; try { - this.fileIO = fileIO = ioFactory.create(cfgFile, "rw"); + this.fileIO = fileIO = ioFactory.create(cfgFile, CREATE, READ, WRITE); if (cfgFile.length() == 0) allocated.set(initFile()); http://git-wip-us.apache.org/repos/asf/ignite/blob/e8379aea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java index 53bd802..40870dc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java @@ -55,7 +55,7 @@ public class FileVersionCheckingFactory implements FilePageStoreFactory { if (!file.exists()) return createPageStore(type, file, latestVersion()); - try (FileIO fileIO = fileIOFactory.create(file, "r")) { + try (FileIO fileIO = fileIOFactory.create(file)) { int minHdr = FilePageStore.HEADER_SIZE; if (fileIO.size() < minHdr) http://git-wip-us.apache.org/repos/asf/ignite/blob/e8379aea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java index 73a560a..55849fe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java @@ -17,94 +17,88 @@ package org.apache.ignite.internal.processors.cache.persistence.file; +import java.io.File; import java.io.IOException; -import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import java.nio.file.OpenOption; /** - * File I/O implementation based on {@code java.io.RandomAccessFile}. + * File I/O implementation based on {@link FileChannel}. */ public class RandomAccessFileIO implements FileIO { - - /** - * Random access file associated with this I/O - */ - private final RandomAccessFile file; - /** - * File channel associated with {@code file} + * File channel. */ - private final FileChannel channel; + private final FileChannel ch; /** * Creates I/O implementation for specified {@code file} * - * @param file Random access file + * @param file File. + * @param modes Open modes. */ - public RandomAccessFileIO(RandomAccessFile file) { - this.file = file; - this.channel = file.getChannel(); + public RandomAccessFileIO(File file, OpenOption... modes) throws IOException { + ch = FileChannel.open(file.toPath(), modes); } /** {@inheritDoc} */ @Override public long position() throws IOException { - return channel.position(); + return ch.position(); } /** {@inheritDoc} */ @Override public void position(long newPosition) throws IOException { - channel.position(newPosition); + ch.position(newPosition); } /** {@inheritDoc} */ @Override public int read(ByteBuffer destinationBuffer) throws IOException { - return channel.read(destinationBuffer); + return ch.read(destinationBuffer); } /** {@inheritDoc} */ @Override public int read(ByteBuffer destinationBuffer, long position) throws IOException { - return channel.read(destinationBuffer, position); + return ch.read(destinationBuffer, position); } /** {@inheritDoc} */ @Override public int read(byte[] buffer, int offset, int length) throws IOException { - return file.read(buffer, offset, length); + return ch.read(ByteBuffer.wrap(buffer, offset, length)); } /** {@inheritDoc} */ @Override public int write(ByteBuffer sourceBuffer) throws IOException { - return channel.write(sourceBuffer); + return ch.write(sourceBuffer); } /** {@inheritDoc} */ @Override public int write(ByteBuffer sourceBuffer, long position) throws IOException { - return channel.write(sourceBuffer, position); + return ch.write(sourceBuffer, position); } /** {@inheritDoc} */ @Override public void write(byte[] buffer, int offset, int length) throws IOException { - file.write(buffer, offset, length); + ch.write(ByteBuffer.wrap(buffer, offset, length)); } /** {@inheritDoc} */ @Override public void force() throws IOException { - channel.force(false); + ch.force(false); } /** {@inheritDoc} */ @Override public long size() throws IOException { - return channel.size(); + return ch.size(); } /** {@inheritDoc} */ @Override public void clear() throws IOException { - channel.position(0); - file.setLength(0); + ch.truncate(0); } /** {@inheritDoc} */ @Override public void close() throws IOException { - file.close(); + ch.close(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e8379aea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIOFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIOFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIOFactory.java index 6b731f2..856ba1c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIOFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIOFactory.java @@ -19,7 +19,11 @@ package org.apache.ignite.internal.processors.cache.persistence.file; import java.io.File; import java.io.IOException; -import java.io.RandomAccessFile; +import java.nio.file.OpenOption; + +import static java.nio.file.StandardOpenOption.CREATE; +import static java.nio.file.StandardOpenOption.READ; +import static java.nio.file.StandardOpenOption.WRITE; /** * File I/O factory which provides RandomAccessFileIO implementation of FileIO. @@ -30,13 +34,11 @@ public class RandomAccessFileIOFactory implements FileIOFactory { /** {@inheritDoc} */ @Override public FileIO create(File file) throws IOException { - return create(file, "rw"); + return create(file, CREATE, READ, WRITE); } /** {@inheritDoc} */ - @Override public FileIO create(File file, String mode) throws IOException { - RandomAccessFile rf = new RandomAccessFile(file, mode); - - return new RandomAccessFileIO(rf); + @Override public FileIO create(File file, OpenOption... modes) throws IOException { + return new RandomAccessFileIO(file, modes); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e8379aea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java index 2749d5c..d5a2555 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java @@ -258,7 +258,7 @@ public abstract class AbstractWalRecordsIterator @Nullable final FileWALPointer start) throws IgniteCheckedException, FileNotFoundException { try { - FileIO fileIO = ioFactory.create(desc.file, "r"); + FileIO fileIO = ioFactory.create(desc.file); try { FileInput in = new FileInput(fileIO, buf); http://git-wip-us.apache.org/repos/asf/ignite/blob/e8379aea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java index 74edbfa..3b20fce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java @@ -63,6 +63,13 @@ public final class FileInput implements ByteBufferBackedDataInput { } /** + * File I/O. + */ + public FileIO io() { + return io; + } + + /** * Clear buffer. */ private void clearBuffer() { http://git-wip-us.apache.org/repos/asf/ignite/blob/e8379aea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index 18584a8..87069d9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -79,6 +79,10 @@ import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import static java.nio.file.StandardOpenOption.CREATE; +import static java.nio.file.StandardOpenOption.READ; +import static java.nio.file.StandardOpenOption.WRITE; + /** * File WAL manager. */ @@ -940,7 +944,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (log.isDebugEnabled()) log.debug("Formatting file [exists=" + file.exists() + ", file=" + file.getAbsolutePath() + ']'); - try (FileIO fileIO = ioFactory.create(file, "rw")) { + try (FileIO fileIO = ioFactory.create(file, CREATE, READ, WRITE)) { int left = psCfg.getWalSegmentSize(); if (mode == WALMode.DEFAULT) { @@ -1365,7 +1369,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl Files.move(dstTmpFile.toPath(), dstFile.toPath()); if (mode == WALMode.DEFAULT) { - try (FileIO f0 = ioFactory.create(dstFile, "rw")) { + try (FileIO f0 = ioFactory.create(dstFile, CREATE, READ, WRITE)) { f0.force(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e8379aea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java index 900aab5..c92d572 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java @@ -181,7 +181,7 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator { /** * This methods checks all provided files to be correct WAL segment. - * Header record and its position is checked. WAL position is used to deremine real index. + * Header record and its position is checked. WAL position is used to determine real index. * File index from file name is ignored. * * @param allFiles files to scan @@ -202,7 +202,7 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator { FileWALPointer ptr; try ( - FileIO fileIO = ioFactory.create(file, "r"); + FileIO fileIO = ioFactory.create(file); ByteBufferExpander buf = new ByteBufferExpander(HEADER_RECORD_SIZE, ByteOrder.nativeOrder()) ) { final DataInput in = new FileInput(fileIO, buf); http://git-wip-us.apache.org/repos/asf/ignite/blob/e8379aea/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java index 323babd..f8c0b14 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.locks.LockSupport; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.IgniteFutureCancelledCheckedException; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; @@ -499,6 +500,21 @@ public class GridFutureAdapter implements IgniteInternalFuture { } /** + * Resets future for subsequent reuse. + */ + public void reset() { + final Object oldState = state; + + if (oldState == INIT) + return; + + if (!isDone(oldState)) + throw new IgniteException("Illegal state"); + + compareAndSetState(oldState, INIT); + } + + /** * Callback to notify that future is cancelled. * * @return {@code True} if cancel flag was set by this call. http://git-wip-us.apache.org/repos/asf/ignite/blob/e8379aea/modules/core/src/main/resources/META-INF/classnames.properties ---------------------------------------------------------------------- diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index 1931a93..d91eef4 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -2091,3 +2091,5 @@ org.apache.ignite.transactions.TransactionRollbackException org.apache.ignite.transactions.TransactionState org.apache.ignite.transactions.TransactionTimeoutException org.apache.ignite.util.AttributeNodeFilter +org.apache.ignite.internal.processors.cache.persistence.file.AsyncFileIO +org.apache.ignite.internal.processors.cache.persistence.file.AsyncFileIOFactory \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/e8379aea/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java new file mode 100644 index 0000000..aab569a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.db.file; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.MemoryConfiguration; +import org.apache.ignite.configuration.MemoryPolicyConfiguration; +import org.apache.ignite.configuration.PersistentStoreConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.processors.cache.persistence.file.AsyncFileIOFactory; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jsr166.ThreadLocalRandom8; + +import java.util.concurrent.atomic.AtomicReference; + +/** + * Test what interruptions of writing threads do not affect PDS. + */ +public class IgnitePdsThreadInterruptionTest extends GridCommonAbstractTest { + /** */ + private static final int PAGE_SIZE = 1 << 12; // 4096 + + /** */ + public static final int THREADS_CNT = 1; + + /** + * Cache name. + */ + private final String cacheName = "cache"; + + /** */ + private volatile boolean stop = false; + + /** + * {@inheritDoc} + */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + final IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setPersistentStoreConfiguration(storeConfiguration()); + + cfg.setMemoryConfiguration(memoryConfiguration()); + + cfg.setCacheConfiguration(new CacheConfiguration<>(cacheName)); + + return cfg; + } + + /** + * @return Store config. + */ + private PersistentStoreConfiguration storeConfiguration() { + PersistentStoreConfiguration cfg = new PersistentStoreConfiguration(); + + cfg.setWalMode(WALMode.LOG_ONLY); + + cfg.setWalFsyncDelayNanos(0); + + cfg.setFileIOFactory(new AsyncFileIOFactory()); + + return cfg; + } + + /** + * @return Memory config. + */ + private MemoryConfiguration memoryConfiguration() { + final MemoryConfiguration memCfg = new MemoryConfiguration(); + + MemoryPolicyConfiguration memPlcCfg = new MemoryPolicyConfiguration(); + // memPlcCfg.setPageEvictionMode(RANDOM_LRU); TODO Fix NPE on start. + memPlcCfg.setName("dfltMemPlc"); + + memCfg.setPageSize(PAGE_SIZE); + memCfg.setConcurrencyLevel(1); + memCfg.setMemoryPolicies(memPlcCfg); + memCfg.setDefaultMemoryPolicyName("dfltMemPlc"); + + return memCfg; + } + + /** + * {@inheritDoc} + */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + deleteWorkFiles(); + } + + /** + * {@inheritDoc} + */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + + deleteWorkFiles(); + } + + /** + * Tests interruptions on WAL write. + * + * @throws Exception + */ + public void testInterruptsOnWALWrite() throws Exception { + final Ignite ignite = startGrid(); + + ignite.active(true); + + final int valLen = 8192; + + final byte[] payload = new byte[valLen]; + + final int maxKey = 100_000; + + Thread[] workers = new Thread[THREADS_CNT]; + + final AtomicReference fail = new AtomicReference<>(); + + Runnable clo = new Runnable() { + @Override public void run() { + IgniteCache cache = ignite.cache(cacheName); + + while (!stop) + cache.put(ThreadLocalRandom8.current().nextInt(maxKey), payload); + } + }; + + for (int i = 0; i < workers.length; i++) { + workers[i] = new Thread(clo); + workers[i].setName("writer-" + i); + workers[i].setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + @Override public void uncaughtException(Thread t, Throwable e) { + fail.compareAndSet(null, e); + } + }); + } + + for (Thread worker : workers) + worker.start(); + + Thread.sleep(3_000); + + // Interrupts should not affect writes. + for (Thread worker : workers) + worker.interrupt(); + + Thread.sleep(3_000); + + stop = true; + + for (Thread worker : workers) + worker.join(); + + Throwable t = fail.get(); + + assert t == null : t; + + IgniteCache cache = ignite.cache(cacheName); + + int verifiedKeys = 0; + + // Post check. + for (int i = 0; i < maxKey; i++) { + byte[] val = (byte[]) cache.get(i); + + if (val != null) { + assertEquals("Illegal length", valLen, val.length); + + verifiedKeys++; + } + } + + log.info("Verified keys: " + verifiedKeys); + } + + /** + * @throws IgniteCheckedException If fail. + */ + private void deleteWorkFiles() throws IgniteCheckedException { + deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/e8379aea/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java index cad10ae..048e8bf 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java @@ -42,12 +42,16 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; +import java.nio.file.OpenOption; + +import static java.nio.file.StandardOpenOption.CREATE; +import static java.nio.file.StandardOpenOption.READ; +import static java.nio.file.StandardOpenOption.WRITE; /** * */ public class IgniteWalFlushFailoverTest extends GridCommonAbstractTest { - /** */ private static final String TEST_CACHE = "testCache"; @@ -168,22 +172,22 @@ public class IgniteWalFlushFailoverTest extends GridCommonAbstractTest { private static class FailingFileIOFactory implements FileIOFactory { private static final long serialVersionUID = 0L; + /** */ private final FileIOFactory delegateFactory = new RandomAccessFileIOFactory(); - @Override - public FileIO create(File file) throws IOException { - return create(file, "rw"); + /** {@inheritDoc} */ + @Override public FileIO create(File file) throws IOException { + return create(file, CREATE, READ, WRITE); } - @Override - public FileIO create(File file, String mode) throws IOException { - FileIO delegate = delegateFactory.create(file, mode); + /** {@inheritDoc} */ + @Override public FileIO create(File file, OpenOption... modes) throws IOException { + FileIO delegate = delegateFactory.create(file, modes); return new FileIODecorator(delegate) { int writeAttempts = 2; - @Override - public int write(ByteBuffer sourceBuffer) throws IOException { + @Override public int write(ByteBuffer sourceBuffer) throws IOException { if (--writeAttempts == 0) throw new RuntimeException("Test exception. Unable to write to file."); http://git-wip-us.apache.org/repos/asf/ignite/blob/e8379aea/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java index e4874d9..3d52507 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java @@ -21,11 +21,11 @@ import junit.framework.TestCase; import java.io.EOFException; import java.io.File; import java.io.IOException; -import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.concurrent.ThreadLocalRandom; -import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIO; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExpander; import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput; import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException; @@ -38,9 +38,6 @@ public class IgniteDataIntegrityTests extends TestCase { /** File input. */ private FileInput fileInput; - /** Random access file. */ - private RandomAccessFile randomAccessFile; - /** Buffer expander. */ private ByteBufferExpander expBuf; @@ -51,13 +48,13 @@ public class IgniteDataIntegrityTests extends TestCase { File file = File.createTempFile("integrity", "dat"); file.deleteOnExit(); - randomAccessFile = new RandomAccessFile(file, "rw"); - expBuf = new ByteBufferExpander(1024, ByteOrder.BIG_ENDIAN); + FileIOFactory factory = new RandomAccessFileIOFactory(); + fileInput = new FileInput( - new RandomAccessFileIO(randomAccessFile), - expBuf + factory.create(file), + expBuf ); ByteBuffer buf = ByteBuffer.allocate(1024); @@ -71,13 +68,15 @@ public class IgniteDataIntegrityTests extends TestCase { buf.putInt(PureJavaCrc32.calcCrc32(buf, 12)); } - randomAccessFile.write(buf.array()); - randomAccessFile.getFD().sync(); + buf.rewind(); + + fileInput.io().write(buf); + fileInput.io().force(); } /** {@inheritDoc} */ @Override protected void tearDown() throws Exception { - randomAccessFile.close(); + fileInput.io().close(); expBuf.close(); } @@ -177,22 +176,24 @@ public class IgniteDataIntegrityTests extends TestCase { */ private void toggleOneRandomBit(int rangeFrom, int rangeTo) throws IOException { int pos = ThreadLocalRandom.current().nextInt(rangeFrom, rangeTo); - randomAccessFile.seek(pos); + fileInput.io().position(pos); + + byte[] buf = new byte[1]; - byte b = randomAccessFile.readByte(); + fileInput.io().read(buf, 0, 1); - b ^= (1 << 3); + buf[0] ^= (1 << 3); - randomAccessFile.seek(pos); - randomAccessFile.writeByte(b); - randomAccessFile.getFD().sync(); + fileInput.io().position(pos); + fileInput.io().write(buf, 0, 1); + fileInput.io().force(); } /** * */ private void checkIntegrity() throws Exception { - randomAccessFile.seek(0); + fileInput.io().position(0); for (int i = 0; i < 1024 / 16; i++) { try(FileInput.Crc32CheckingFileInput in = fileInput.startRead(false)) { http://git-wip-us.apache.org/repos/asf/ignite/blob/e8379aea/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/IgniteWalConverter.java ---------------------------------------------------------------------- diff --git a/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/IgniteWalConverter.java b/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/IgniteWalConverter.java index b7c3cb7..f3268d9 100644 --- a/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/IgniteWalConverter.java +++ b/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/IgniteWalConverter.java @@ -21,7 +21,6 @@ import java.io.File; import org.apache.ignite.internal.pagemem.wal.WALIterator; import org.apache.ignite.internal.pagemem.wal.WALPointer; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; -import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory; http://git-wip-us.apache.org/repos/asf/ignite/blob/e8379aea/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java index ae8ea18..cfbe2e0 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsMulti import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsPageEvictionTest; import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsCacheIntegrationTest; import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsNoActualWalHistoryTest; +import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsThreadInterruptionTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRecoveryTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalRecoveryTxLogicalRecordsTest; @@ -53,6 +54,7 @@ public class IgnitePdsWithIndexingCoreTestSuite extends TestSuite { suite.addTestSuite(IgnitePdsBinaryMetadataOnClusterRestartTest.class); suite.addTestSuite(IgnitePdsMarshallerMappingRestoreOnNodeStartTest.class); + suite.addTestSuite(IgnitePdsThreadInterruptionTest.class); return suite; }