ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [1/2] ignite git commit: IGNITE-7380 Implemented pluggable Direct IO - Fixes #3226.
Date Thu, 18 Jan 2018 10:58:25 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-2.4 1daa7c41b -> cf0080210


http://git-wip-us.apache.org/repos/asf/ignite/blob/cf008021/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIOFactory.java
----------------------------------------------------------------------
diff --git a/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIOFactory.java
b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIOFactory.java
new file mode 100644
index 0000000..b9d11cd
--- /dev/null
+++ b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIOFactory.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.file.OpenOption;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.NotNull;
+import org.jsr166.ConcurrentHashMap8;
+
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.READ;
+import static java.nio.file.StandardOpenOption.WRITE;
+
+/**
+ * Direct native IO factory for block IO operations on aligned memory structures.<br>
+ * This limited functionality is used for page store operations.<br>
+ * <b>Note: </b> This type of IO not applicable for WAL or other files.<br>
<br>
+ * This IO tries to minimize cache effects of the I/O (page caching by OS). <br> <br>
+ * In general this will degrade performance, but it is useful in special
+ * situations, such as when applications do their own caching.<br>
+ */
+public class AlignedBuffersDirectFileIOFactory implements FileIOFactory {
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Page size from durable memory. */
+    private final int pageSize;
+
+    /** Backup factory for files in case native is not available or not applicable. */
+    private final FileIOFactory backupFactory;
+
+    /** File system/os block size, negative value if library init was failed. */
+    private final int fsBlockSize;
+
+    /** Use backup factory, {@code true} if direct IO setup failed. */
+    private boolean useBackupFactory;
+
+    /** Thread local with buffers with capacity = one page {@code pageSize} and aligned using
{@code fsBlockSize}. */
+    private ThreadLocal<ByteBuffer> tlbOnePageAligned;
+
+    /**
+     * Managed aligned buffers. This collection is used to free buffers, an for checking
if buffer is known to be
+     * already aligned.
+     */
+    private final ConcurrentHashMap8<Long, Thread> managedAlignedBuffers = new ConcurrentHashMap8<>();
+
+    /**
+     * Creates direct native IO factory.
+     *
+     * @param log Logger.
+     * @param storePath Storage path, used to check FS settings.
+     * @param pageSize durable memory page size.
+     * @param backupFactory fallback factory if init failed.
+     */
+    public AlignedBuffersDirectFileIOFactory(
+        final IgniteLogger log,
+        final File storePath,
+        final int pageSize,
+        final FileIOFactory backupFactory) {
+        this.log = log;
+        this.pageSize = pageSize;
+        this.backupFactory = backupFactory;
+
+        useBackupFactory = true;
+        fsBlockSize = IgniteNativeIoLib.getFsBlockSize(storePath.getAbsolutePath(), log);
+
+        if(!IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_DIRECT_IO_ENABLED,
true)) {
+            if (log.isInfoEnabled())
+                log.info("Direct IO is explicitly disabled by system property");
+
+            return;
+        }
+
+        if (fsBlockSize > 0) {
+            int blkSize = fsBlockSize;
+
+            if (pageSize % blkSize != 0) {
+                U.warn(log, String.format("Unable to setup Direct IO for Ignite [pageSize=%d
bytes;" +
+                        " file system block size=%d]. For speeding up Ignite consider setting
%s.setPageSize(%d)." +
+                        " Direct IO is disabled",
+                    pageSize, blkSize, DataStorageConfiguration.class.getSimpleName(), blkSize));
+            }
+            else {
+                useBackupFactory = false;
+
+                tlbOnePageAligned = new ThreadLocal<ByteBuffer>() {
+                    @Override protected ByteBuffer initialValue() {
+                        return createManagedBuffer(pageSize);
+                    }
+                };
+
+                if (log.isInfoEnabled()) {
+                    log.info(String.format("Direct IO is enabled for block IO operations
on aligned memory structures." +
+                        " [block size = %d, durable memory page size = %d]", blkSize, pageSize));
+                }
+            }
+        }
+        else {
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Direct IO library is not available on current operating
system [%s]." +
+                    " Direct IO is not enabled.", System.getProperty("os.version")));
+            }
+        }
+
+    }
+
+    /**
+     * <b>Note: </b> Use only if {@link #isDirectIoAvailable()}.
+     *
+     * @param size buffer size to allocate.
+     * @return new byte buffer.
+     */
+    @NotNull ByteBuffer createManagedBuffer(int size) {
+        assert !useBackupFactory : "Direct IO is disabled, aligned managed buffer creation
is disabled now";
+        assert managedAlignedBuffers != null : "Direct buffers not available";
+
+        ByteBuffer allocate = AlignedBuffers.allocate(fsBlockSize, size).order(ByteOrder.nativeOrder());
+
+        managedAlignedBuffers.put(GridUnsafe.bufferAddress(allocate), Thread.currentThread());
+
+        return allocate;
+    }
+
+    /** {@inheritDoc} */
+    @Override public FileIO create(File file) throws IOException {
+        return create(file, CREATE, READ, WRITE);
+    }
+
+    /** {@inheritDoc} */
+    @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+        if (useBackupFactory)
+            return backupFactory.create(file, modes);
+
+        return new AlignedBuffersDirectFileIO(fsBlockSize, pageSize, file, modes, tlbOnePageAligned,
managedAlignedBuffers, log);
+
+    }
+
+    /**
+     * @return {@code true} if Direct IO can be used on current OS and file system settings
+     */
+    boolean isDirectIoAvailable() {
+        return !useBackupFactory;
+    }
+
+    /**
+     * Managed aligned buffers and its associated threads. This collection is used to free
buffers, an for checking if
+     * buffer is known to be already aligned.
+     *
+     * @return map address->thread.
+     */
+    ConcurrentHashMap8<Long, Thread> managedAlignedBuffers() {
+        return managedAlignedBuffers;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/cf008021/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/IgniteNativeIoLib.java
----------------------------------------------------------------------
diff --git a/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/IgniteNativeIoLib.java
b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/IgniteNativeIoLib.java
new file mode 100644
index 0000000..47f1e6a
--- /dev/null
+++ b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/IgniteNativeIoLib.java
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.file;
+
+import com.sun.jna.Native;
+import com.sun.jna.NativeLong;
+import com.sun.jna.Platform;
+import com.sun.jna.Pointer;
+import com.sun.jna.ptr.PointerByReference;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.StringTokenizer;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Native IO library based on *nix C library, enabled for Linux, kernel version >= 2.4.10.
<br>
+ * <br>
+ * Uses JNA library (https://github.com/java-native-access/jna) to access native calls. <br>
+ * <br>
+ */
+@SuppressWarnings({"OctalInteger", "WeakerAccess"})
+public class IgniteNativeIoLib {
+    /** Open for reading only. */
+    public static final int O_RDONLY = 00;
+
+    /** Open for writing only. */
+    public static final int O_WRONLY = 01;
+
+    /** Open for reading and writing. */
+    public static final int O_RDWR = 02;
+
+    /** File shall be created. If the file exists, this flag has no effect. */
+    public static final int O_CREAT = 0100;
+
+    /** If the file exists and is a regular file length shall be truncated to 0. */
+    public static final int O_TRUNC = 01000;
+
+    /** Try to minimize cache effects of the I/O to and from this file.  */
+    public static final int O_DIRECT = 040000;
+
+    /**
+     * Write operations on the file will complete according to the requirements of synchronized
I/O file integrity
+     * completion. By the time write(2) (or similar) returns, the output data and associated
file metadata have been
+     * transferred to the underlying hardware.
+     */
+    public static final int O_SYNC = 04000000;
+
+    /**
+     * The specified data will not be accessed in the near future. See fadvise.h and "man
2 posix_fadvise".
+    */
+    public static final int POSIX_FADV_DONTNEED = 4;
+
+    /** Flag for newly created files: user has read permission. */
+    public static final int S_IRUSR = 00400;
+
+    /** Flag for newly created files: user has write permission. */
+    public static final int S_IWUSR = 00200;
+
+    /** Flag for newly created files: group has read permission. */
+    public static final int S_IRGRP = 00040;
+
+    /** Flag for newly created files: others have read permission. */
+    public static final int S_IROTH = 00004;
+
+    /** Default access mask for newly created files. */
+    public static final int DEFAULT_OPEN_MODE = S_IRUSR | S_IWUSR | S_IROTH | S_IRGRP;
+
+    /** Invalid argument. */
+    public static final int E_INVAL = 22;
+
+    /** Seek option: set file offset to offset */
+    public static final int SEEK_SET = 0;
+
+    /** Seek option: change file position to offset */
+    public static final int SEEK_CUR = 1;
+
+    /** JNA library available and initialized. Always {@code false} for non linux systems.
*/
+    private static boolean jnaAvailable;
+
+    /** JNA library initialization exception. To be logged to Ignite logger later. */
+    @Nullable private static Exception ex;
+
+    static {
+        if (Platform.isLinux()) {
+            try {
+                if (checkLinuxVersion()) {
+                    Native.register(Platform.C_LIBRARY_NAME);
+                    jnaAvailable = true;
+                }
+                else
+                    jnaAvailable = false;
+            }
+            catch (Exception e) {
+                ex = e;
+                jnaAvailable = false;
+            }
+        }
+        else
+            jnaAvailable = false;
+    }
+
+    /**
+     * O_DIRECT  support was added under Linux in kernel version 2.4.10.
+     *
+     * @return {@code true} if O_DIRECT is supported, kernel version >= 2.4.10
+     */
+    private static boolean checkLinuxVersion() {
+        final String osVer = System.getProperty("os.version");
+
+        if (osVer == null)
+            return false;
+
+        List<Integer> verIntComps = new ArrayList<>();
+
+        for (StringTokenizer tokenizer = new StringTokenizer(osVer, ".-"); tokenizer.hasMoreTokens();
) {
+            String verComp = tokenizer.nextToken();
+
+            if (verComp.matches("\\d*"))
+                verIntComps.add(Integer.parseInt(verComp));
+        }
+
+        if (verIntComps.isEmpty())
+            return false;
+
+        final int verIdx = 0;
+        final int majorRevIdx = 1;
+        final int minorRevIdx = 2;
+
+        if (verIntComps.get(verIdx) > 2)
+            return true;
+        else if (verIntComps.get(verIdx) == 2) {
+            int compsCnt = verIntComps.size();
+
+            if (compsCnt > majorRevIdx && verIntComps.get(majorRevIdx) > 4)
+                return true;
+            else if (compsCnt > minorRevIdx
+                && verIntComps.get(majorRevIdx) == 4
+                && verIntComps.get(minorRevIdx) >= 10)
+                return true;
+        }
+        return false;
+    }
+
+
+    /**
+     * Calculate Lowest Common Multiplier.
+     * @param a first value.
+     * @param b second value.
+     */
+    private static long lcm(final long a, final long b) {
+        return (a * b) / gcf(a, b);
+    }
+
+    /**
+     * Calculate Greatest Common Factor.
+     * @param a first value.
+     * @param b second value.
+     */
+    private static long gcf(final long a, final long b) {
+        if (b == 0)
+            return a;
+        else
+            return gcf(b, a % b);
+    }
+
+    /**
+     * Determines FS and OS block size. Returns file system block size for use with storageDir
see "man 3 posix_memalign"
+     *
+     * @param storageDir storage path, base path to check (FS) configuration parameters.
+     * @param log Logger.
+     * @return <ul><li>FS block size to be used in Direct IO and memory alignments.</li>
+     * <li>or <tt>-1</tt> Operating System is not applicable for enabling
Direct IO.</li>
+     * <li>and <tt>-1</tt> if failed to determine block size.</li>
+     * <li>and <tt>-1</tt> if JNA is not available or init failed.</li>
</ul>
+     */
+    public static int getFsBlockSize(final String storageDir, final IgniteLogger log) {
+        if (ex != null) {
+            U.warn(log, "Failed to initialize O_DIRECT support at current OS: " + ex.getMessage(),
ex);
+
+            return -1;
+        }
+
+        if (!jnaAvailable)
+            return -1;
+
+        int fsBlockSize = -1;
+        int _PC_REC_XFER_ALIGN = 0x11;
+        int pcAlign = pathconf(storageDir, _PC_REC_XFER_ALIGN).intValue();
+
+        if (pcAlign > 0)
+            fsBlockSize = pcAlign;
+
+        int pageSize = getpagesize();
+
+        fsBlockSize = (int)lcm(fsBlockSize, pageSize);
+
+        // just being completely paranoid: (512 is the rule for 2.6+ kernels)
+        fsBlockSize = (int)lcm(fsBlockSize, 512);
+
+        if (log.isInfoEnabled())
+            log.info(String.format("Page size configuration for storage path [%s]: %d;" +
+                    " Linux memory page size: %d;" +
+                    " Selected FS block size : %d.",
+                storageDir, pcAlign, pageSize, fsBlockSize));
+
+        // lastly, a sanity check
+        if (fsBlockSize <= 0 || ((fsBlockSize & (fsBlockSize - 1)) != 0)) {
+            U.warn(log, "File system block size should be a power of two, was found to be
" + fsBlockSize +
+                " Disabling O_DIRECT support");
+
+            return -1;
+        }
+
+        if (log.isInfoEnabled())
+            log.info("Selected FS block size : " + fsBlockSize);
+
+        return fsBlockSize;
+    }
+
+    /**
+     * @return Flag indicating JNA library available and initialized. Always {@code false}
for non linux systems.
+     */
+    public static boolean isJnaAvailable() {
+        return jnaAvailable;
+    }
+
+    /**
+     * Open a file. See "man 3 open".
+     *
+     * @param pathname pathname naming the file.
+     * @param flags flag/open options. Flags are constructed by a bitwise-inclusive OR of
flags.
+     * @param mode create file mode creation mask.
+     * @return file descriptor.
+     */
+    public static native int open(String pathname, int flags, int mode);
+
+    /**
+     * See "man 2 close".
+     *
+     * @param fd The file descriptor of the file to close.
+     * @return 0 on success, -1 on error.
+     */
+    public static native int close(int fd);
+
+    /**
+     * Writes up to {@code cnt} bytes to the buffer starting at {@code buf} to the file descriptor
{@code fd} at offset
+     * {@code offset}. The file offset is not changed. See "man 2 pwrite".
+     *
+     * @param fd file descriptor.
+     * @param buf pointer to buffer with data.
+     * @param cnt bytes to write.
+     * @param off position in file to write data.
+     * @return the number of bytes written. Note that is not an error for a successful call
to transfer fewer bytes than
+     * requested.
+     */
+    public static native NativeLong pwrite(int fd, Pointer buf, NativeLong cnt, NativeLong
off);
+
+    /**
+     * Writes up to {@code cnt} bytes to the buffer starting at {@code buf} to the file descriptor
{@code fd}.
+     * The file offset is changed. See "man 2 write".
+     *
+     * @param fd file descriptor.
+     * @param buf pointer to buffer with data.
+     * @param cnt bytes to write.
+     * @return the number of bytes written. Note that is not an error for a successful call
to transfer fewer bytes than
+     * requested.
+     */
+    public static native NativeLong write(int fd, Pointer buf, NativeLong cnt);
+
+    /**
+     * Reads up to {@code cnt} bytes from file descriptor {@code fd} at offset {@code off}
(from the start of the file)
+     * into the buffer starting at {@code buf}. The file offset is not changed. See "man
2 pread".
+     *
+     * @param fd file descriptor.
+     * @param buf pointer to buffer to place the data.
+     * @param cnt bytes to read.
+     * @return On success, the number of bytes read is returned (zero indicates end of file),
on error, -1 is returned,
+     * and errno is set appropriately.
+     */
+    public static native NativeLong pread(int fd, Pointer buf, NativeLong cnt, NativeLong
off);
+
+    /**
+     * Reads up to {@code cnt} bytes from file descriptor {@code fd} into the buffer starting
at {@code buf}. The file
+     * offset is changed. See "man 2 read".
+     *
+     * @param fd file descriptor.
+     * @param buf pointer to buffer to place the data.
+     * @param cnt bytes to read.
+     * @return On success, the number of bytes read is returned (zero indicates end of file),
on error, -1 is returned,
+     * and errno is set appropriately.
+     */
+    public static native NativeLong read(int fd, Pointer buf, NativeLong cnt);
+
+    /**
+     * Synchronize a file's in-core state with storage device. See "man 2 fsync".
+     * @param fd file descriptor.
+     * @return On success return zero. On error, -1 is returned, and errno is set appropriately.
+     */
+    public static native int fsync(int fd);
+
+    /**
+     * Allocates size bytes and places the address of the allocated memory in {@code memptr}.
+     * The address of the allocated memory will be a multiple of {@code alignment}.
+     *
+     * See "man 3 posix_memalign".
+     * @param memptr out memory pointer.
+     * @param alignment memory alignment,  must be a power of two and a multiple of sizeof(void
*).
+     * @param size size of buffer.
+     * @return returns zero on success, or one of the error values.
+     */
+    public static native int posix_memalign(PointerByReference memptr, NativeLong alignment,
NativeLong size);
+
+    /**
+     * Frees the memory space pointed to by ptr, which must have been returned by a previous
call to native allocation
+     * methods. POSIX requires that memory obtained from {@link #posix_memalign} can be freed
using free. See "man 3
+     * free".
+     *
+     * @param ptr pointer to free.
+     */
+    public static native void free(Pointer ptr);
+
+    /**
+     * Function returns a string that describes the error code passed in the argument {@code
errnum}. See "man 3
+     * strerror".
+     *
+     * @param errnum error code.
+     * @return displayable error information.
+     */
+    public static native String strerror(int errnum);
+
+    /**
+     * Return path (FS) configuration parameter value. <br>
+     * Helps to determine alignment restrictions, for example, on buffers used for direct
block device I/O. <br>
+     * POSIX specifies the pathconf(path,_PC_REC_XFER_ALIGN) call that tells what alignment
is needed.
+     *
+     * @param path base path to check settings.
+     * @param name variable name to query.
+     */
+    public static native NativeLong pathconf(String path, int name);
+
+    /**
+     * The function getpagesize() returns the number of bytes in a memory
+     * page, where "page" is a fixed-length block, the unit for memory
+     * allocation and file mapping
+     */
+    public static native int getpagesize();
+
+    /**
+     * Allows to announce an intention to access file data in a specific pattern in the future,
thus allowing the
+     * kernel to perform appropriate optimizations.
+     *
+     * The advice applies to a (not necessarily existent) region starting at
+     * {@code off} and extending for {@code len} bytes (or until the end of the file if len
is 0)
+     * within the file referred to by fd.
+     *
+     * See "man 2 posix_fadvise".
+     *
+     * @param fd file descriptor.
+     * @param off region start.
+     * @param len region end.
+     * @param flag advice (option) to apply.
+     * @return On success, zero is returned.  On error, an error number is returned.
+     */
+    public static native int posix_fadvise(int fd, long off, long len, int flag);
+
+    /**
+     * Causes regular file referenced by fd to be truncated to a size of precisely length
bytes.
+     *
+     * If the file previously was larger than this size, the extra data is lost.
+     * If the file previously was shorter, it is extended, and the extended part reads as
null bytes ('\0').
+     * The file offset is not changed.
+     *
+     * @param fd  file descriptor.
+     * @param len required length.
+     * @return On success, zero is returned. On error, -1 is returned, and errno is set appropriately.
+     */
+    public static native int ftruncate(int fd, long len);
+
+    /**
+     * Repositions the file offset of the open file description associated with the file
descriptor {@code fd}
+     * to the argument offset according to the directive {@code whence}
+     * @param fd file descriptor.
+     * @param off required position offset.
+     * @param whence position base.
+     * @return  On error, the value -1 is returned and errno is set to indicate the error.
+     */
+    public static native long lseek(int fd, long off, int whence);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/cf008021/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/LinuxNativeIoPlugin.java
----------------------------------------------------------------------
diff --git a/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/LinuxNativeIoPlugin.java
b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/LinuxNativeIoPlugin.java
new file mode 100644
index 0000000..32c63df
--- /dev/null
+++ b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/LinuxNativeIoPlugin.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.file;
+
+import org.apache.ignite.plugin.IgnitePlugin;
+
+/** Noop plugin. See {@link IgniteNativeIoLib}. */
+public class LinuxNativeIoPlugin implements IgnitePlugin {
+    // No-op.
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/cf008021/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/LinuxNativeIoPluginProvider.java
----------------------------------------------------------------------
diff --git a/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/LinuxNativeIoPluginProvider.java
b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/LinuxNativeIoPluginProvider.java
new file mode 100644
index 0000000..918ff5c
--- /dev/null
+++ b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/LinuxNativeIoPluginProvider.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.file;
+
+import java.io.FileDescriptor;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.CachePluginContext;
+import org.apache.ignite.plugin.CachePluginProvider;
+import org.apache.ignite.plugin.ExtensionRegistry;
+import org.apache.ignite.plugin.IgnitePlugin;
+import org.apache.ignite.plugin.PluginContext;
+import org.apache.ignite.plugin.PluginProvider;
+import org.apache.ignite.plugin.PluginValidationException;
+import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentHashMap8;
+
+/**
+ * Plugin provider for setting up {@link IgniteNativeIoLib}.
+ */
+public class LinuxNativeIoPluginProvider implements PluginProvider {
+    /** Managed buffers map from address to thread requested buffer. */
+    @Nullable private ConcurrentHashMap8<Long, Thread> managedBuffers;
+
+    /** Logger. */
+    private IgniteLogger log;
+
+    /** {@inheritDoc} */
+    @Override public String name() {
+        return "Ignite Native I/O Plugin [Direct I/O]";
+    }
+
+    /** {@inheritDoc} */
+    @Override public String version() {
+        return "";
+    }
+
+    /** {@inheritDoc} */
+    @Override public String copyright() {
+        return "Copyright(C) Apache Software Foundation";
+    }
+
+    /** {@inheritDoc} */
+    @Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public CachePluginProvider createCacheProvider(CachePluginContext ctx) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start(PluginContext ctx) {
+        final Ignite ignite = ctx.grid();
+
+        log = ignite.log();
+        managedBuffers = setupDirect((IgniteEx)ignite);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop(boolean cancel) {
+        freeDirectBuffers();
+    }
+
+    /**
+     * Free direct thread local buffer allocated for Direct IO user's threads.
+     */
+    private void freeDirectBuffers() {
+        ConcurrentHashMap8<Long, Thread> buffers = managedBuffers;
+
+        if (buffers == null)
+            return;
+
+        managedBuffers = null;
+
+        if (log.isDebugEnabled())
+            log.debug("Direct IO buffers to be freed: " + buffers.size());
+
+        for (Map.Entry<Long, Thread> next : buffers.entrySet()) {
+            Thread th = next.getValue();
+            Long addr = next.getKey();
+
+            if (log.isDebugEnabled())
+                log.debug(String.format("Free Direct IO buffer [address=%d; Thread=%s; alive=%s]",
+                    addr, th != null ? th.getName() : "", th != null && th.isAlive()));
+
+            AlignedBuffers.free(addr);
+        }
+
+        buffers.clear();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onIgniteStart() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onIgniteStop(boolean cancel) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Serializable provideDiscoveryData(UUID nodeId) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void receiveDiscoveryData(UUID nodeId, Serializable data) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void validateNewNode(ClusterNode node) throws PluginValidationException
{
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Object createComponent(PluginContext ctx, Class cls) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgnitePlugin plugin() {
+        return new LinuxNativeIoPlugin();
+    }
+
+    /**
+     * @param ignite Ignite starting up.
+     * @return Managed aligned buffers and its associated threads. This collection is used
to free buffers. May return
+     * {@code null}.
+     */
+    @Nullable private ConcurrentHashMap8<Long, Thread> setupDirect(IgniteEx ignite)
{
+        GridCacheSharedContext<Object, Object> cacheCtx = ignite.context().cache().context();
+        IgnitePageStoreManager ignitePageStoreMgr = cacheCtx.pageStore();
+
+        if (ignitePageStoreMgr == null)
+            return null;
+
+        if (!(ignitePageStoreMgr instanceof FilePageStoreManager))
+            return null;
+
+        final FilePageStoreManager pageStore = (FilePageStoreManager)ignitePageStoreMgr;
+        FileIOFactory backupIoFactory = pageStore.getPageStoreFileIoFactory();
+
+        final AlignedBuffersDirectFileIOFactory factory = new AlignedBuffersDirectFileIOFactory(
+            ignite.log(),
+            pageStore.workDir(),
+            pageStore.pageSize(),
+            backupIoFactory);
+
+        final FileWriteAheadLogManager walMgr = (FileWriteAheadLogManager)cacheCtx.wal();
+
+        if (walMgr != null && IgniteNativeIoLib.isJnaAvailable()) {
+            walMgr.setCreateWalFileListener(new IgniteInClosure<FileIO>() {
+                @Override public void apply(FileIO fileIO) {
+                    adviceFileDontNeed(fileIO, walMgr.maxWalSegmentSize());
+                }
+            });
+        }
+
+        if (!factory.isDirectIoAvailable())
+            return null;
+
+        GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)cacheCtx.database();
+
+        db.setThreadBuf(new ThreadLocal<ByteBuffer>() {
+            @Override protected ByteBuffer initialValue() {
+                return factory.createManagedBuffer(pageStore.pageSize());
+            }
+        });
+
+        pageStore.setPageStoreFileIOFactories(factory, backupIoFactory);
+
+        return factory.managedAlignedBuffers();
+    }
+
+    /**
+     * Apply advice: The specified data will not be accessed in the near future.
+     *
+     * Useful for WAL segments to indicate file content won't be loaded.
+     *
+     * @param fileIO file to advice.
+     * @param size expected size of file.
+     */
+    private void adviceFileDontNeed(FileIO fileIO, long size)   {
+        try {
+            if(fileIO instanceof RandomAccessFileIO) {
+                RandomAccessFileIO chIo = (RandomAccessFileIO)fileIO;
+
+                FileChannel ch = U.field(chIo, "ch");
+
+                FileDescriptor fd = U.field(ch, "fd");
+
+                int fdVal = U.field(fd, "fd");
+
+                int retVal = IgniteNativeIoLib.posix_fadvise(fdVal, 0, size, IgniteNativeIoLib.POSIX_FADV_DONTNEED);
+
+                if (retVal != 0) {
+                    U.warn(log, "Unable to apply fadvice on WAL file descriptor [fd=" + fdVal
+ "]:" +
+                        IgniteNativeIoLib.strerror(retVal));
+                }
+            }
+        }
+        catch (Exception e) {
+            U.warn(log, "Unable to advice on WAL file descriptor: [" + e.getMessage() + "]",
e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/cf008021/modules/direct-io/src/main/resources/META-INF/services/org.apache.ignite.plugin.PluginProvider
----------------------------------------------------------------------
diff --git a/modules/direct-io/src/main/resources/META-INF/services/org.apache.ignite.plugin.PluginProvider
b/modules/direct-io/src/main/resources/META-INF/services/org.apache.ignite.plugin.PluginProvider
new file mode 100644
index 0000000..ee1ccd8
--- /dev/null
+++ b/modules/direct-io/src/main/resources/META-INF/services/org.apache.ignite.plugin.PluginProvider
@@ -0,0 +1 @@
+org.apache.ignite.internal.processors.cache.persistence.file.LinuxNativeIoPluginProvider
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/cf008021/modules/direct-io/src/test/java/org/apache/ignite/internal/processors/cache/persistence/file/IgniteNativeIoWithNoPersistenceTest.java
----------------------------------------------------------------------
diff --git a/modules/direct-io/src/test/java/org/apache/ignite/internal/processors/cache/persistence/file/IgniteNativeIoWithNoPersistenceTest.java
b/modules/direct-io/src/test/java/org/apache/ignite/internal/processors/cache/persistence/file/IgniteNativeIoWithNoPersistenceTest.java
new file mode 100644
index 0000000..981e0d5
--- /dev/null
+++ b/modules/direct-io/src/test/java/org/apache/ignite/internal/processors/cache/persistence/file/IgniteNativeIoWithNoPersistenceTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.file;
+
+import com.google.common.base.Strings;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Checks if Direct IO can be set up if no persistent store is configured
+ */
+public class IgniteNativeIoWithNoPersistenceTest extends GridCommonAbstractTest {
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws
Exception {
+        IgniteConfiguration configuration = super.getConfiguration(igniteInstanceName);
+
+        configuration.setDataStorageConfiguration(new DataStorageConfiguration()
+            .setDefaultDataRegionConfiguration(new DataRegionConfiguration()));
+
+        return configuration;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * Checks simple launch with native IO.
+     * @throws Exception if failed
+     */
+    public void testDirectIoHandlesNoPersistentGrid() throws Exception {
+        IgniteEx ignite = startGrid(0);
+
+        ignite.active(true);
+
+        IgniteCache<Object, Object> cache = ignite.getOrCreateCache("cache");
+
+        for (int i = 0; i < 100; i++)
+            cache.put(i, valueForKey(i));
+
+
+        stopAllGrids();
+    }
+
+    /**
+     * @param i key.
+     * @return value with extra data, which allows to verify
+     */
+    @NotNull private String valueForKey(int i) {
+        return Strings.repeat(Integer.toString(i), 10);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/cf008021/modules/direct-io/src/test/java/org/apache/ignite/testsuites/IgnitePdsNativeIoTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/direct-io/src/test/java/org/apache/ignite/testsuites/IgnitePdsNativeIoTestSuite.java
b/modules/direct-io/src/test/java/org/apache/ignite/testsuites/IgnitePdsNativeIoTestSuite.java
new file mode 100644
index 0000000..48454ea
--- /dev/null
+++ b/modules/direct-io/src/test/java/org/apache/ignite/testsuites/IgnitePdsNativeIoTestSuite.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.testsuites;
+
+import junit.framework.TestSuite;
+import org.apache.ignite.internal.processors.cache.persistence.file.IgniteNativeIoWithNoPersistenceTest;
+
+/**
+ * Subset of {@link IgnitePdsTestSuite} suite test, started with direct-oi jar in classpath.
+ */
+public class IgnitePdsNativeIoTestSuite extends TestSuite {
+    /**
+     * @return Suite.
+     */
+    public static TestSuite suite() {
+        TestSuite suite = new TestSuite("Ignite Persistent Store Test Suite (with Direct
IO)");
+
+        IgnitePdsTestSuite.addRealPageStoreTests(suite);
+
+        suite.addTestSuite(IgniteNativeIoWithNoPersistenceTest.class);
+
+        return suite;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/cf008021/modules/direct-io/src/test/java/org/apache/ignite/testsuites/IgnitePdsNativeIoTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/direct-io/src/test/java/org/apache/ignite/testsuites/IgnitePdsNativeIoTestSuite2.java
b/modules/direct-io/src/test/java/org/apache/ignite/testsuites/IgnitePdsNativeIoTestSuite2.java
new file mode 100644
index 0000000..54dd7d3
--- /dev/null
+++ b/modules/direct-io/src/test/java/org/apache/ignite/testsuites/IgnitePdsNativeIoTestSuite2.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.testsuites;
+
+import junit.framework.TestSuite;
+
+/**
+ * Same as {@link IgnitePdsTestSuite2} but is started with direct-oi jar in classpath.
+ */
+public class IgnitePdsNativeIoTestSuite2 extends TestSuite {
+    /**
+     * @return Suite.
+     * @throws Exception If failed.
+     */
+    public static TestSuite suite() throws Exception {
+        TestSuite suite = new TestSuite("Ignite Persistent Store Test Suite 2 (Native IO)");
+
+        IgnitePdsTestSuite2.addRealPageStoreTests(suite);
+
+        return suite;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/cf008021/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1d8637c..eab1ec8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -52,6 +52,7 @@
         <module>modules/tools</module>
         <module>modules/core</module>
         <module>modules/dev-utils</module>
+        <module>modules/direct-io</module>
         <module>modules/hadoop</module>
         <module>modules/extdata/p2p</module>
         <module>modules/extdata/uri</module>


Mime
View raw message