hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1190066 - in /hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common: ./ src/main/java/org/apache/hadoop/io/ src/main/java/org/apache/hadoop/io/nativeio/ src/main/native/ src/main/native/src/org/apache/hadoop/io/nativeio/ s...
Date Thu, 27 Oct 2011 22:19:02 GMT
Author: todd
Date: Thu Oct 27 22:19:01 2011
New Revision: 1190066

URL: http://svn.apache.org/viewvc?rev=1190066&view=rev
Log:
HADOOP-7753. Support fadvise and sync_file_range in NativeIO. Add ReadaheadPool infrastructure
for use in HDFS and MR. Contributed by Todd Lipcon.

Added:
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ReadaheadPool.java
Modified:
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/native/configure.ac
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/file_descriptor.c
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1190066&r1=1190065&r2=1190066&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt Thu
Oct 27 22:19:01 2011
@@ -428,6 +428,9 @@ Release 0.23.0 - Unreleased
     HADOOP-7445. Implement bulk checksum verification using efficient native
     code. (todd)
 
+    HADOOP-7753. Support fadvise and sync_file_range in NativeIO. Add
+    ReadaheadPool infrastructure for use in HDFS and MR. (todd)
+
   BUG FIXES
 
     HADOOP-7630. hadoop-metrics2.properties should have a property *.period 

Added: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ReadaheadPool.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ReadaheadPool.java?rev=1190066&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ReadaheadPool.java
(added)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ReadaheadPool.java
Thu Oct 27 22:19:01 2011
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io;
+
+import java.io.FileDescriptor;
+import java.io.IOException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.nativeio.NativeIO;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Manages a pool of threads which can issue readahead requests on file descriptors.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ReadaheadPool {
+  static final Log LOG = LogFactory.getLog(ReadaheadPool.class);
+  private static final int POOL_SIZE = 4;
+  private static final int MAX_POOL_SIZE = 16;
+  private static final int CAPACITY = 1024;
+  private final ThreadPoolExecutor pool;
+  
+  private static ReadaheadPool instance;
+
+  /**
+   * Return the singleton instance for the current process.
+   */
+  public static ReadaheadPool getInstance() {
+    synchronized (ReadaheadPool.class) {
+      if (instance == null && NativeIO.isAvailable()) {
+        instance = new ReadaheadPool();
+      }
+      return instance;
+    }
+  }
+  
+  private ReadaheadPool() {
+    pool = new ThreadPoolExecutor(POOL_SIZE, MAX_POOL_SIZE, 3L, TimeUnit.SECONDS,
+        new ArrayBlockingQueue<Runnable>(CAPACITY));
+    pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
+    pool.setThreadFactory(new ThreadFactoryBuilder()
+      .setDaemon(true)
+      .setNameFormat("Readahead Thread #%d")
+      .build());
+  }
+
+  /**
+   * Issue a request to readahead on the given file descriptor.
+   * 
+   * @param identifier a textual identifier that will be used in error
+   * messages (e.g. the file name)
+   * @param fd the file descriptor to read ahead
+   * @param curPos the current offset at which reads are being issued
+   * @param readaheadLength the configured length to read ahead
+   * @param maxOffsetToRead the maximum offset that will be readahead
+   *        (useful if, for example, only some segment of the file is
+   *        requested by the user). Pass {@link Long.MAX_VALUE} to allow
+   *        readahead to the end of the file.
+   * @param lastReadahead the result returned by the previous invocation
+   *        of this function on this file descriptor, or null if this is
+   *        the first call
+   * @return an object representing this outstanding request, or null
+   *        if no readahead was performed
+   */
+  public ReadaheadRequest readaheadStream(
+      String identifier,
+      FileDescriptor fd,
+      long curPos,
+      long readaheadLength,
+      long maxOffsetToRead,
+      ReadaheadRequest lastReadahead) {
+    
+    Preconditions.checkArgument(curPos <= maxOffsetToRead,
+        "Readahead position %s higher than maxOffsetToRead %s",
+        curPos, maxOffsetToRead);
+
+    if (readaheadLength <= 0) {
+      return null;
+    }
+    
+    long lastOffset = Long.MIN_VALUE;
+    
+    if (lastReadahead != null) {
+      lastOffset = lastReadahead.getOffset();
+    }
+
+    // trigger each readahead when we have reached the halfway mark
+    // in the previous readahead. This gives the system time
+    // to satisfy the readahead before we start reading the data.
+    long nextOffset = lastOffset + readaheadLength / 2; 
+    if (curPos >= nextOffset) {
+      // cancel any currently pending readahead, to avoid
+      // piling things up in the queue. Each reader should have at most
+      // one outstanding request in the queue.
+      if (lastReadahead != null) {
+        lastReadahead.cancel();
+        lastReadahead = null;
+      }
+      
+      long length = Math.min(readaheadLength,
+          maxOffsetToRead - curPos);
+
+      if (length <= 0) {
+        // we've reached the end of the stream
+        return null;
+      }
+      
+      return submitReadahead(identifier, fd, curPos, length);
+    } else {
+      return lastReadahead;
+    }
+  }
+      
+  /**
+   * Submit a request to readahead on the given file descriptor.
+   * @param identifier a textual identifier used in error messages, etc.
+   * @param fd the file descriptor to readahead
+   * @param off the offset at which to start the readahead
+   * @param len the number of bytes to read
+   * @return an object representing this pending request
+   */
+  public ReadaheadRequest submitReadahead(
+      String identifier, FileDescriptor fd, long off, long len) {
+    ReadaheadRequestImpl req = new ReadaheadRequestImpl(
+        identifier, fd, off, len);
+    pool.execute(req);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("submit readahead: " + req);
+    }
+    return req;
+  }
+  
+  /**
+   * An outstanding readahead request that has been submitted to
+   * the pool. This request may be pending or may have been
+   * completed.
+   */
+  public interface ReadaheadRequest {
+    /**
+     * Cancels the request for readahead. This should be used
+     * if the reader no longer needs the requested data, <em>before</em>
+     * closing the related file descriptor.
+     * 
+     * It is safe to use even if the readahead request has already
+     * been fulfilled.
+     */
+    public void cancel();
+    
+    /**
+     * @return the requested offset
+     */
+    public long getOffset();
+
+    /**
+     * @return the requested length
+     */
+    public long getLength();
+  }
+  
+  private static class ReadaheadRequestImpl implements Runnable, ReadaheadRequest {
+    private final String identifier;
+    private final FileDescriptor fd;
+    private final long off, len;
+    private volatile boolean canceled = false;
+    
+    private ReadaheadRequestImpl(String identifier, FileDescriptor fd, long off, long len)
{
+      this.identifier = identifier;
+      this.fd = fd;
+      this.off = off;
+      this.len = len;
+    }
+    
+    public void run() {
+      if (canceled) return;
+      // There's a very narrow race here that the file will close right at
+      // this instant. But if that happens, we'll likely receive an EBADF
+      // error below, and see that it's canceled, ignoring the error.
+      // It's also possible that we'll end up requesting readahead on some
+      // other FD, which may be wasted work, but won't cause a problem.
+      try {
+        NativeIO.posixFadviseIfPossible(fd, off, len,
+            NativeIO.POSIX_FADV_WILLNEED);
+      } catch (IOException ioe) {
+        if (canceled) {
+          // no big deal - the reader canceled the request and closed
+          // the file.
+          return;
+        }
+        LOG.warn("Failed readahead on " + identifier,
+            ioe);
+      }
+    }
+
+    @Override
+    public void cancel() {
+      canceled = true;
+      // We could attempt to remove it from the work queue, but that would
+      // add complexity. In practice, the work queues remain very short,
+      // so removing canceled requests has no gain.
+    }
+
+    @Override
+    public long getOffset() {
+      return off;
+    }
+
+    @Override
+    public long getLength() {
+      return len;
+    }
+
+    @Override
+    public String toString() {
+      return "ReadaheadRequestImpl [identifier='" + identifier + "', fd=" + fd
+          + ", off=" + off + ", len=" + len + "]";
+    }
+  }
+}

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java?rev=1190066&r1=1190065&r2=1190066&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
Thu Oct 27 22:19:01 2011
@@ -46,10 +46,41 @@ public class NativeIO {
   public static final int O_FSYNC = O_SYNC;
   public static final int O_NDELAY = O_NONBLOCK;
 
+  // Flags for posix_fadvise() from bits/fcntl.h
+  /* No further special treatment.  */
+  public static final int POSIX_FADV_NORMAL = 0; 
+  /* Expect random page references.  */
+  public static final int POSIX_FADV_RANDOM = 1; 
+  /* Expect sequential page references.  */
+  public static final int POSIX_FADV_SEQUENTIAL = 2; 
+  /* Will need these pages.  */
+  public static final int POSIX_FADV_WILLNEED = 3; 
+  /* Don't need these pages.  */
+  public static final int POSIX_FADV_DONTNEED = 4; 
+  /* Data will be accessed once.  */
+  public static final int POSIX_FADV_NOREUSE = 5; 
+
+
+  /* Wait upon writeout of all pages
+     in the range before performing the
+     write.  */
+  public static final int SYNC_FILE_RANGE_WAIT_BEFORE = 1;
+  /* Initiate writeout of all those
+     dirty pages in the range which are
+     not presently under writeback.  */
+  public static final int SYNC_FILE_RANGE_WRITE = 2;
+
+  /* Wait upon writeout of all pages in
+     the range after performing the
+     write.  */
+  public static final int SYNC_FILE_RANGE_WAIT_AFTER = 4;
+
   private static final Log LOG = LogFactory.getLog(NativeIO.class);
 
   private static boolean nativeLoaded = false;
   private static boolean workaroundNonThreadSafePasswdCalls = false;
+  private static boolean fadvisePossible = true;
+  private static boolean syncFileRangePossible = true;
 
   static final String WORKAROUND_NON_THREADSAFE_CALLS_KEY =
     "hadoop.workaround.non.threadsafe.getpwuid";
@@ -88,9 +119,58 @@ public class NativeIO {
   /** Wrapper around chmod(2) */
   public static native void chmod(String path, int mode) throws IOException;
 
+  /** Wrapper around posix_fadvise(2) */
+  static native void posix_fadvise(
+    FileDescriptor fd, long offset, long len, int flags) throws NativeIOException;
+
+  /** Wrapper around sync_file_range(2) */
+  static native void sync_file_range(
+    FileDescriptor fd, long offset, long nbytes, int flags) throws NativeIOException;
+
   /** Initialize the JNI method ID and class ID cache */
   private static native void initNative();
 
+  /**
+   * Call posix_fadvise on the given file descriptor. See the manpage
+   * for this syscall for more information. On systems where this
+   * call is not available, does nothing.
+   *
+   * @throws NativeIOException if there is an error with the syscall
+   */
+  public static void posixFadviseIfPossible(
+      FileDescriptor fd, long offset, long len, int flags)
+      throws NativeIOException {
+    if (nativeLoaded && fadvisePossible) {
+      try {
+        posix_fadvise(fd, offset, len, flags);
+      } catch (UnsupportedOperationException uoe) {
+        fadvisePossible = false;
+      } catch (UnsatisfiedLinkError ule) {
+        fadvisePossible = false;
+      }
+    }
+  }
+
+  /**
+   * Call sync_file_range on the given file descriptor. See the manpage
+   * for this syscall for more information. On systems where this
+   * call is not available, does nothing.
+   *
+   * @throws NativeIOException if there is an error with the syscall
+   */
+  public static void syncFileRangeIfPossible(
+      FileDescriptor fd, long offset, long nbytes, int flags)
+      throws NativeIOException {
+    if (nativeLoaded && syncFileRangePossible) {
+      try {
+        sync_file_range(fd, offset, nbytes, flags);
+      } catch (UnsupportedOperationException uoe) {
+        syncFileRangePossible = false;
+      } catch (UnsatisfiedLinkError ule) {
+        syncFileRangePossible = false;
+      }
+    }
+  }
 
   /**
    * Result type of the fstat call

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/native/configure.ac
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/native/configure.ac?rev=1190066&r1=1190065&r2=1190066&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/native/configure.ac
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/native/configure.ac
Thu Oct 27 22:19:01 2011
@@ -40,6 +40,7 @@ AC_CONFIG_AUX_DIR([config])
 AC_CONFIG_MACRO_DIR([m4])
 AC_CONFIG_HEADER([config.h])
 AC_SYS_LARGEFILE
+AC_GNU_SOURCE
 
 AM_INIT_AUTOMAKE(hadoop,1.0.0)
 
@@ -57,10 +58,8 @@ if test $JAVA_HOME != ""
 then
   JNI_LDFLAGS="-L$JAVA_HOME/jre/lib/$OS_ARCH/server"
 fi
-ldflags_bak=$LDFLAGS
 LDFLAGS="$LDFLAGS $JNI_LDFLAGS"
 AC_CHECK_LIB([jvm], [JNI_GetCreatedJavaVMs])
-LDFLAGS=$ldflags_bak
 AC_SUBST([JNI_LDFLAGS])
 
 # Checks for header files.
@@ -94,6 +93,12 @@ AC_CHECK_HEADERS([snappy-c.h], AC_COMPUT
 dnl Check for headers needed by the native Group resolution implementation
 AC_CHECK_HEADERS([fcntl.h stdlib.h string.h unistd.h], [], AC_MSG_ERROR(Some system headers
not found... please ensure their presence on your platform.))
 
+dnl check for posix_fadvise
+AC_CHECK_HEADERS(fcntl.h, [AC_CHECK_FUNCS(posix_fadvise)])
+
+dnl check for sync_file_range
+AC_CHECK_HEADERS(fcntl.h, [AC_CHECK_FUNCS(sync_file_range)])
+
 # Checks for typedefs, structures, and compiler characteristics.
 AC_C_CONST
 

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c?rev=1190066&r1=1190065&r2=1190066&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c
Thu Oct 27 22:19:01 2011
@@ -29,6 +29,7 @@
 #include <string.h>
 #include <sys/stat.h>
 #include <sys/types.h>
+#include <sys/syscall.h>
 #include <unistd.h>
 
 #include "org_apache_hadoop.h"
@@ -234,6 +235,81 @@ cleanup:
 }
 
 
+
+/**
+ * public static native void posix_fadvise(
+ *   FileDescriptor fd, long offset, long len, int flags);
+ */
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_io_nativeio_NativeIO_posix_1fadvise(
+  JNIEnv *env, jclass clazz,
+  jobject fd_object, jlong offset, jlong len, jint flags)
+{
+#ifndef HAVE_POSIX_FADVISE
+  THROW(env, "java/lang/UnsupportedOperationException",
+        "fadvise support not available");
+#else
+  int fd = fd_get(env, fd_object);
+  PASS_EXCEPTIONS(env);
+
+  int err = 0;
+  if ((err = posix_fadvise(fd, (off_t)offset, (off_t)len, flags))) {
+    throw_ioe(env, err);
+  }
+#endif
+}
+
+#if defined(HAVE_SYNC_FILE_RANGE)
+#  define my_sync_file_range sync_file_range
+#elif defined(SYS_sync_file_range)
+// RHEL 5 kernels have sync_file_range support, but the glibc
+// included does not have the library function. We can
+// still call it directly, and if it's not supported by the
+// kernel, we'd get ENOSYS. See RedHat Bugzilla #518581
+static int manual_sync_file_range (int fd, __off64_t from, __off64_t to, unsigned int flags)
+{
+#ifdef __x86_64__
+  return syscall( SYS_sync_file_range, fd, from, to, flags);
+#else
+  return syscall (SYS_sync_file_range, fd,
+    __LONG_LONG_PAIR ((long) (from >> 32), (long) from),
+    __LONG_LONG_PAIR ((long) (to >> 32), (long) to),
+    flags);
+#endif
+}
+#define my_sync_file_range manual_sync_file_range
+#endif
+
+/**
+ * public static native void sync_file_range(
+ *   FileDescriptor fd, long offset, long len, int flags);
+ */
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_io_nativeio_NativeIO_sync_1file_1range(
+  JNIEnv *env, jclass clazz,
+  jobject fd_object, jlong offset, jlong len, jint flags)
+{
+#ifndef my_sync_file_range
+  THROW(env, "java/lang/UnsupportedOperationException",
+        "sync_file_range support not available");
+#else
+  int fd = fd_get(env, fd_object);
+  PASS_EXCEPTIONS(env);
+
+  if (my_sync_file_range(fd, (off_t)offset, (off_t)len, flags)) {
+    if (errno == ENOSYS) {
+      // we know the syscall number, but it's not compiled
+      // into the running kernel
+      THROW(env, "java/lang/UnsupportedOperationException",
+            "sync_file_range kernel support not available");
+      return;
+    } else {
+      throw_ioe(env, errno);
+    }
+  }
+#endif
+}
+
 /*
  * public static native FileDescriptor open(String path, int flags, int mode);
  */

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/file_descriptor.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/file_descriptor.c?rev=1190066&r1=1190065&r2=1190066&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/file_descriptor.c
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/file_descriptor.c
Thu Oct 27 22:19:01 2011
@@ -54,6 +54,11 @@ void fd_deinit(JNIEnv *env) {
  * underlying fd, or throw if unavailable
  */
 int fd_get(JNIEnv* env, jobject obj) {
+  if (obj == NULL) {
+    THROW(env, "java/lang/NullPointerException",
+          "FileDescriptor object is null");
+    return -1;
+  }
   return (*env)->GetIntField(env, obj, fd_descriptor);
 }
 

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java?rev=1190066&r1=1190065&r2=1190066&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java
Thu Oct 27 22:19:01 2011
@@ -19,6 +19,7 @@ package org.apache.hadoop.io.nativeio;
 
 import java.io.File;
 import java.io.FileDescriptor;
+import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicReference;
@@ -210,6 +211,66 @@ public class TestNativeIO {
     assertPermissions(toChmod, 0644);
   }
 
+
+  @Test
+  public void testPosixFadvise() throws Exception {
+    FileInputStream fis = new FileInputStream("/dev/zero");
+    try {
+      NativeIO.posix_fadvise(fis.getFD(), 0, 0,
+                             NativeIO.POSIX_FADV_SEQUENTIAL);
+    } catch (UnsupportedOperationException uoe) {
+      // we should just skip the unit test on machines where we don't
+      // have fadvise support
+      assumeTrue(false);
+    } finally {
+      fis.close();
+    }
+
+    try {
+      NativeIO.posix_fadvise(fis.getFD(), 0, 1024,
+                             NativeIO.POSIX_FADV_SEQUENTIAL);
+
+      fail("Did not throw on bad file");
+    } catch (NativeIOException nioe) {
+      assertEquals(Errno.EBADF, nioe.getErrno());
+    }
+    
+    try {
+      NativeIO.posix_fadvise(null, 0, 1024,
+                             NativeIO.POSIX_FADV_SEQUENTIAL);
+
+      fail("Did not throw on null file");
+    } catch (NullPointerException npe) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testSyncFileRange() throws Exception {
+    FileOutputStream fos = new FileOutputStream(
+      new File(TEST_DIR, "testSyncFileRange"));
+    try {
+      fos.write("foo".getBytes());
+      NativeIO.sync_file_range(fos.getFD(), 0, 1024,
+                               NativeIO.SYNC_FILE_RANGE_WRITE);
+      // no way to verify that this actually has synced,
+      // but if it doesn't throw, we can assume it worked
+    } catch (UnsupportedOperationException uoe) {
+      // we should just skip the unit test on machines where we don't
+      // have fadvise support
+      assumeTrue(false);
+    } finally {
+      fos.close();
+    }
+    try {
+      NativeIO.sync_file_range(fos.getFD(), 0, 1024,
+                               NativeIO.SYNC_FILE_RANGE_WRITE);
+      fail("Did not throw on bad file");
+    } catch (NativeIOException nioe) {
+      assertEquals(Errno.EBADF, nioe.getErrno());
+    }
+  }
+
   private void assertPermissions(File f, int expected) throws IOException {
     FileSystem localfs = FileSystem.getLocal(new Configuration());
     FsPermission perms = localfs.getFileStatus(



Mime
View raw message