bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [1/7] bookkeeper git commit: bookie: fallocate & sync_file_range
Date Thu, 20 Apr 2017 17:35:23 GMT
Repository: bookkeeper
Updated Branches:
  refs/heads/sijie/bookkeeper_fallocate [created] 95028b9d8


bookie: fallocate & sync_file_range

- introduce fallocate & sync_file_range in NativeIO to provide better preallocation & file sync logic.
- if journalAdaptiveGroupWrites is disabled, use sync_file_range to sync range in better granularity
- add more stats on journal flush & creation.

RB_ID=260795


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/d9802962
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/d9802962
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/d9802962

Branch: refs/heads/sijie/bookkeeper_fallocate
Commit: d9802962b20acf7a86209262871af7a2eb4cbed7
Parents: 95ea481
Author: Sijie Guo <sijieg@twitter.com>
Authored: Mon Jan 27 23:03:07 2014 -0800
Committer: Sijie Guo <sijie@apache.org>
Committed: Thu Nov 17 17:17:21 2016 -0800

----------------------------------------------------------------------
 bookkeeper-server/bin/bookkeeper                |   9 +-
 bookkeeper-server/pom.xml                       |  73 ++++++++
 bookkeeper-server/src/CMakeLists.txt            | 142 ++++++++++++++++
 bookkeeper-server/src/JNIFlags.cmake            | 118 +++++++++++++
 bookkeeper-server/src/config.h.cmake            |  26 +++
 .../org/apache/bookkeeper/bookie/Journal.java   | 165 ++++++++++++-------
 .../bookkeeper/bookie/JournalChannel.java       | 137 ++++++++++++---
 .../java/org/apache/bookkeeper/util/Errno.java  | 115 +++++++++++++
 .../org/apache/bookkeeper/util/NativeIO.java    | 147 +++++++++++++++--
 .../src/org/apache/bookkeeper/util/NativeIO.c   | 122 ++++++++++++++
 10 files changed, 955 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/d9802962/bookkeeper-server/bin/bookkeeper
----------------------------------------------------------------------
diff --git a/bookkeeper-server/bin/bookkeeper b/bookkeeper-server/bin/bookkeeper
index 54be3fe..87429f9 100755
--- a/bookkeeper-server/bin/bookkeeper
+++ b/bookkeeper-server/bin/bookkeeper
@@ -180,10 +180,11 @@ if [ -z "$BOOKIE_LOG_CONF" ]; then
 fi
 
 BOOKIE_CLASSPATH="$BOOKIE_JAR:$BOOKIE_CLASSPATH:$BOOKIE_EXTRA_CLASSPATH"
-BOOKIE_CLASSPATH="`dirname $BOOKIE_LOG_CONF`:$BOOKIE_CLASSPATH"
-OPTS="$OPTS -Dlog4j.configuration=`basename $BOOKIE_LOG_CONF`"
-
-OPTS="-cp $BOOKIE_CLASSPATH $OPTS"
+if [ "$BOOKIE_LOG_CONF" != "" ]; then
+    BOOKIE_CLASSPATH="`dirname $BOOKIE_LOG_CONF`:$BOOKIE_CLASSPATH"
+    OPTS="$OPTS -Dlog4j.configuration=`basename $BOOKIE_LOG_CONF`"
+fi
+OPTS="-cp $BOOKIE_CLASSPATH -Djava.library.path=$BK_HOME/target/native/target/usr/local/lib $OPTS"
 
 OPTS="$OPTS $BOOKIE_EXTRA_OPTS"
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/d9802962/bookkeeper-server/pom.xml
----------------------------------------------------------------------
diff --git a/bookkeeper-server/pom.xml b/bookkeeper-server/pom.xml
index bd143f1..730a659 100644
--- a/bookkeeper-server/pom.xml
+++ b/bookkeeper-server/pom.xml
@@ -327,6 +327,79 @@
   </build>
   <profiles>
     <profile>
+      <id>native</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-enforcer-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>enforce-os</id>
+                <goals>
+                  <goal>enforce</goal>
+                </goals>
+                <configuration>
+                  <rules>
+                    <requireOS>
+                      <family>mac</family>
+                      <family>unix</family>
+                      <message>native build only supported on Mac or Unix</message>
+                    </requireOS>
+                  </rules>  
+                  <fail>true</fail>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>native-maven-plugin</artifactId>
+            <executions>
+              <execution>
+                <phase>compile</phase>
+                <goals>
+                  <goal>javah</goal>
+                </goals>
+                <configuration>
+                  <javahPath>${env.JAVA_HOME}/bin/javah</javahPath>
+                  <javahClassNames>
+                    <javahClassName>org.apache.bookkeeper.util.NativeIO</javahClassName>
+                  </javahClassNames>
+                  <javahOutputDirectory>${project.build.directory}/native/javah</javahOutputDirectory>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-antrun-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>make</id>
+                <phase>compile</phase>
+                <goals><goal>run</goal></goals>
+                <configuration>
+                  <target>
+                    <exec executable="cmake" dir="${project.build.directory}/native" failonerror="true">
+                      <arg line="${basedir}/src/ -DGENERATED_JAVAH=${project.build.directory}/native/javah -DJVM_ARCH_DATA_MODEL=${sun.arch.data.model}"/>
+                    </exec>
+                    <exec executable="make" dir="${project.build.directory}/native" failonerror="true">
+                      <arg line="VERBOSE=1"/>
+                    </exec>
+                    <exec executable="make" dir="${project.build.directory}/native" failonerror="true"></exec>
+                  </target>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
       <id>protobuf</id>
       <build>
         <plugins>

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/d9802962/bookkeeper-server/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/CMakeLists.txt b/bookkeeper-server/src/CMakeLists.txt
new file mode 100644
index 0000000..3d446b8
--- /dev/null
+++ b/bookkeeper-server/src/CMakeLists.txt
@@ -0,0 +1,142 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+cmake_minimum_required(VERSION 2.6 FATAL_ERROR)
+
+# Default to release builds
+set(CMAKE_BUILD_TYPE, Release)
+
+include(JNIFlags.cmake NO_POLICY_SCOPE)
+
+# Compile a library with both shared and static variants
+function(add_dual_library LIBNAME)
+    add_library(${LIBNAME} SHARED ${ARGN})
+    add_library(${LIBNAME}_static STATIC ${ARGN})
+    set_target_properties(${LIBNAME}_static PROPERTIES OUTPUT_NAME ${LIBNAME})
+endfunction(add_dual_library)
+
+# Link both a static and a dynamic target against some libraries
+function(target_link_dual_libraries LIBNAME)
+    target_link_libraries(${LIBNAME} ${ARGN})
+    target_link_libraries(${LIBNAME}_static ${ARGN})
+endfunction(target_link_dual_libraries)
+
+function(output_directory TGT DIR)
+    SET_TARGET_PROPERTIES(${TGT} PROPERTIES
+        RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/${DIR}")
+    SET_TARGET_PROPERTIES(${TGT} PROPERTIES
+        ARCHIVE_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/${DIR}")
+    SET_TARGET_PROPERTIES(${TGT} PROPERTIES
+        LIBRARY_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/${DIR}")
+endfunction(output_directory TGT DIR)
+
+function(dual_output_directory TGT DIR)
+    output_directory(${TGT} "${DIR}")
+    output_directory(${TGT}_static "${DIR}")
+endfunction(dual_output_directory TGT DIR)
+
+#
+# This macro alters the behavior of find_package and find_library.
+# It does this by setting the CMAKE_FIND_LIBRARY_SUFFIXES global variable. 
+# You should save that variable before calling this function and restore it
+# after you have accomplished your goal.
+#
+# The behavior is altered in two ways:
+# 1. We always find shared libraries, never static;
+# 2. We find shared libraries with the given version number.
+#
+# On Windows this function is a no-op.  Windows does not encode
+# version number information information into library path names.
+#
+macro(set_find_shared_library_version LVERS)
+    IF(${CMAKE_SYSTEM_NAME} MATCHES "Darwin")
+        # Mac OS uses .dylib
+        SET(CMAKE_FIND_LIBRARY_SUFFIXES ".${LVERS}.dylib")
+    ELSEIF(${CMAKE_SYSTEM_NAME} MATCHES "FreeBSD")
+        # FreeBSD has always .so installed.
+        SET(CMAKE_FIND_LIBRARY_SUFFIXES ".so")
+    ELSEIF(${CMAKE_SYSTEM_NAME} MATCHES "Windows")
+        # Windows doesn't support finding shared libraries by version.
+    ELSE()
+        # Most UNIX variants use .so
+        SET(CMAKE_FIND_LIBRARY_SUFFIXES ".so.${LVERS}")
+    ENDIF()
+endmacro(set_find_shared_library_version LVERS)
+
+if (NOT GENERATED_JAVAH)
+    # Must identify where the generated headers have been placed
+    MESSAGE(FATAL_ERROR "You must set the cmake variable GENERATED_JAVAH")
+endif (NOT GENERATED_JAVAH)
+find_package(JNI REQUIRED)
+
+SET(STORED_CMAKE_FIND_LIBRARY_SUFFIXES CMAKE_FIND_LIBRARY_SUFFIXES)
+set_find_shared_library_version("1")
+find_package(ZLIB REQUIRED)
+SET(CMAKE_FIND_LIBRARY_SUFFIXES STORED_CMAKE_FIND_LIBRARY_SUFFIXES)
+
+set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -g -Wall -O2")
+set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -D_REENTRANT -D_GNU_SOURCE")
+set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -D_LARGEFILE_SOURCE -D_FILE_OFFSET_BITS=64")
+set(D main/native/src/org/apache/bookkeeper)
+set(T main/native/src/test/org/apache/bookkeeper)
+
+INCLUDE(CheckFunctionExists)
+INCLUDE(CheckCSourceCompiles)
+INCLUDE(CheckLibraryExists)
+CHECK_FUNCTION_EXISTS(sync_file_range HAVE_SYNC_FILE_RANGE)
+CHECK_FUNCTION_EXISTS(fallocate HAVE_FALLOCATE)
+CHECK_FUNCTION_EXISTS(posix_fadvise HAVE_POSIX_FADVISE)
+CHECK_FUNCTION_EXISTS(posix_fallocate HAVE_POSIX_ALLOCATE)
+CHECK_LIBRARY_EXISTS(dl dlopen "" NEED_LINK_DL)
+
+include_directories(
+    ${GENERATED_JAVAH}
+    main/native/src
+    ${CMAKE_CURRENT_SOURCE_DIR}
+    ${CMAKE_CURRENT_SOURCE_DIR}/src
+    ${CMAKE_BINARY_DIR}
+    ${JNI_INCLUDE_DIRS}
+)
+CONFIGURE_FILE(${CMAKE_SOURCE_DIR}/config.h.cmake ${CMAKE_BINARY_DIR}/config.h)
+
+SET(CMAKE_BUILD_WITH_INSTALL_RPATH TRUE)
+add_dual_library(bookkeeper
+    ${D}/util/NativeIO.c
+)
+if (NEED_LINK_DL)
+   set(LIB_DL dl)
+endif (NEED_LINK_DL)
+
+IF (${CMAKE_SYSTEM_NAME} MATCHES "Linux")
+    #
+    # By embedding '$ORIGIN' into the RPATH of libbookkeeper.so,
+    # dlopen will look in the directory containing libbookkeeper.so.
+    # However, $ORIGIN is not supported by all operating systems.
+    #
+    SET_TARGET_PROPERTIES(bookkeeper
+        PROPERTIES INSTALL_RPATH "\$ORIGIN/")
+ENDIF()
+
+target_link_dual_libraries(bookkeeper
+    ${LIB_DL}
+    ${JAVA_JVM_LIBRARY}
+)
+SET(LIBBOOKKEEPER_VERSION "1.0.0")
+SET_TARGET_PROPERTIES(bookkeeper PROPERTIES
+    SOVERSION ${LIBBOOKKEEPER_VERSION})
+dual_output_directory(bookkeeper target/usr/local/lib)

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/d9802962/bookkeeper-server/src/JNIFlags.cmake
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/JNIFlags.cmake b/bookkeeper-server/src/JNIFlags.cmake
new file mode 100644
index 0000000..8333285
--- /dev/null
+++ b/bookkeeper-server/src/JNIFlags.cmake
@@ -0,0 +1,118 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+cmake_minimum_required(VERSION 2.6 FATAL_ERROR)
+
+# If JVM_ARCH_DATA_MODEL is 32, compile all binaries as 32-bit.
+# This variable is set by maven.
+if (JVM_ARCH_DATA_MODEL EQUAL 32)
+    # Force 32-bit code generation on amd64/x86_64, ppc64, sparc64
+    if (CMAKE_COMPILER_IS_GNUCC AND CMAKE_SYSTEM_PROCESSOR MATCHES ".*64")
+        set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -m32")
+        set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -m32")
+        set(CMAKE_LD_FLAGS "${CMAKE_LD_FLAGS} -m32")
+    endif ()
+    if (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "amd64")
+        # Set CMAKE_SYSTEM_PROCESSOR to ensure that find_package(JNI) will use
+        # the 32-bit version of libjvm.so.
+        set(CMAKE_SYSTEM_PROCESSOR "i686")
+    endif ()
+endif (JVM_ARCH_DATA_MODEL EQUAL 32)
+
+# Determine float ABI of JVM on ARM Linux
+if (CMAKE_SYSTEM_PROCESSOR MATCHES "^arm" AND CMAKE_SYSTEM_NAME STREQUAL "Linux")
+    find_program(READELF readelf)
+    if (READELF MATCHES "NOTFOUND")
+        message(WARNING "readelf not found; JVM float ABI detection disabled")
+    else (READELF MATCHES "NOTFOUND")
+        execute_process(
+            COMMAND ${READELF} -A ${JAVA_JVM_LIBRARY}
+            OUTPUT_VARIABLE JVM_ELF_ARCH
+            ERROR_QUIET)
+        if (NOT JVM_ELF_ARCH MATCHES "Tag_ABI_VFP_args: VFP registers")
+            message("Soft-float JVM detected")
+
+            # Test compilation with -mfloat-abi=softfp using an arbitrary libc function
+            # (typically fails with "fatal error: bits/predefs.h: No such file or directory"
+            # if soft-float dev libraries are not installed)
+            include(CMakePushCheckState)
+            cmake_push_check_state()
+            set(CMAKE_REQUIRED_FLAGS "${CMAKE_REQUIRED_FLAGS} -mfloat-abi=softfp")
+            include(CheckSymbolExists)
+            check_symbol_exists(exit stdlib.h SOFTFP_AVAILABLE)
+            if (NOT SOFTFP_AVAILABLE)
+                message(FATAL_ERROR "Soft-float dev libraries required (e.g. 'apt-get install libc6-dev-armel' on Debian/Ubuntu)")
+            endif (NOT SOFTFP_AVAILABLE)
+            cmake_pop_check_state()
+
+            set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mfloat-abi=softfp")
+        endif ()
+    endif (READELF MATCHES "NOTFOUND")
+endif (CMAKE_SYSTEM_PROCESSOR MATCHES "^arm" AND CMAKE_SYSTEM_NAME STREQUAL "Linux")
+
+IF("${CMAKE_SYSTEM}" MATCHES "Linux")
+    #
+    # Locate JNI_INCLUDE_DIRS and JNI_LIBRARIES.
+    # Since we were invoked from Maven, we know that the JAVA_HOME environment
+    # variable is valid.  So we ignore system paths here and just use JAVA_HOME.
+    #
+    FILE(TO_CMAKE_PATH "$ENV{JAVA_HOME}" _JAVA_HOME)
+    IF(CMAKE_SYSTEM_PROCESSOR MATCHES "^i.86$")
+        SET(_java_libarch "i386")
+    ELSEIF (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "amd64")
+        SET(_java_libarch "amd64")
+    ELSEIF (CMAKE_SYSTEM_PROCESSOR MATCHES "^arm")
+        SET(_java_libarch "arm")
+    ELSE()
+        SET(_java_libarch ${CMAKE_SYSTEM_PROCESSOR})
+    ENDIF()
+    SET(_JDK_DIRS "${_JAVA_HOME}/jre/lib/${_java_libarch}/*"
+                  "${_JAVA_HOME}/jre/lib/${_java_libarch}"
+                  "${_JAVA_HOME}/jre/lib/*"
+                  "${_JAVA_HOME}/jre/lib"
+                  "${_JAVA_HOME}/lib/*"
+                  "${_JAVA_HOME}/lib"
+                  "${_JAVA_HOME}/include/*"
+                  "${_JAVA_HOME}/include"
+                  "${_JAVA_HOME}"
+    )
+    FIND_PATH(JAVA_INCLUDE_PATH
+        NAMES jni.h 
+        PATHS ${_JDK_DIRS}
+        NO_DEFAULT_PATH)
+    #In IBM java, it's jniport.h instead of jni_md.h
+    FIND_PATH(JAVA_INCLUDE_PATH2 
+        NAMES jni_md.h jniport.h
+        PATHS ${_JDK_DIRS}
+        NO_DEFAULT_PATH)
+    SET(JNI_INCLUDE_DIRS ${JAVA_INCLUDE_PATH} ${JAVA_INCLUDE_PATH2})
+    FIND_LIBRARY(JAVA_JVM_LIBRARY
+        NAMES jvm JavaVM
+        PATHS ${_JDK_DIRS}
+        NO_DEFAULT_PATH)
+    SET(JNI_LIBRARIES ${JAVA_JVM_LIBRARY})
+    MESSAGE("JAVA_HOME=${JAVA_HOME}, JAVA_JVM_LIBRARY=${JAVA_JVM_LIBRARY}")
+    MESSAGE("JAVA_INCLUDE_PATH=${JAVA_INCLUDE_PATH}, JAVA_INCLUDE_PATH2=${JAVA_INCLUDE_PATH2}")
+    IF(JAVA_JVM_LIBRARY AND JAVA_INCLUDE_PATH AND JAVA_INCLUDE_PATH2)
+        MESSAGE("Located all JNI components successfully.")
+    ELSE()
+        MESSAGE(FATAL_ERROR "Failed to find a viable JVM installation under JAVA_HOME.")
+    ENDIF()
+ELSE()
+    find_package(JNI REQUIRED)
+ENDIF()

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/d9802962/bookkeeper-server/src/config.h.cmake
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/config.h.cmake b/bookkeeper-server/src/config.h.cmake
new file mode 100644
index 0000000..d460b7f
--- /dev/null
+++ b/bookkeeper-server/src/config.h.cmake
@@ -0,0 +1,26 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef CONFIG_H
+#define CONFIG_H
+
+#cmakedefine HAVE_SYNC_FILE_RANGE
+#cmakedefine HAVE_FALLOCATE
+#cmakedefine HAVE_POSIX_FADVISE
+#cmakedefine HAVE_POSIX_FALLOCATE
+
+#endif

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/d9802962/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 7be0984..dd62d28 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -39,10 +39,15 @@ import com.google.common.base.Stopwatch;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+<<<<<<< HEAD
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
+=======
+import org.apache.bookkeeper.stats.BookkeeperServerStatsLogger;
+import org.apache.bookkeeper.stats.ServerStatsProvider;
+>>>>>>> 2d5718f... bookie: fallocate & sync_file_range
 import org.apache.bookkeeper.util.DaemonThreadFactory;
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.bookkeeper.util.MathUtils;
@@ -271,13 +276,13 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
     /**
      * Journal Entry to Record
      */
-    private class QueueEntry implements Runnable {
-        ByteBuffer entry;
-        long ledgerId;
-        long entryId;
-        WriteCallback cb;
-        Object ctx;
-        long enqueueTime;
+    private class QueueEntry extends SafeRunnable {
+        final ByteBuffer entry;
+        final long ledgerId;
+        final long entryId;
+        final WriteCallback cb;
+        final Object ctx;
+        final long enqueueTime;
 
         QueueEntry(ByteBuffer entry, long ledgerId, long entryId,
                    WriteCallback cb, Object ctx, long enqueueTime) {
@@ -304,19 +309,22 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
         private final LinkedList<QueueEntry> forceWriteWaiters;
         private boolean shouldClose;
         private final boolean isMarker;
-        private final long lastFlushedPosition;
+        private final long startFlushPosition;
+        private final long endFlushPosition;
         private final long logId;
 
         private ForceWriteRequest(JournalChannel logFile,
                           long logId,
-                          long lastFlushedPosition,
+                          long startFlushPosition,
+                          long endFlushPosition,
                           LinkedList<QueueEntry> forceWriteWaiters,
                           boolean shouldClose,
                           boolean isMarker) {
             this.forceWriteWaiters = forceWriteWaiters;
             this.logFile = logFile;
             this.logId = logId;
-            this.lastFlushedPosition = lastFlushedPosition;
+            this.startFlushPosition = startFlushPosition;
+            this.endFlushPosition = endFlushPosition;
             this.shouldClose = shouldClose;
             this.isMarker = isMarker;
             forceWriteQueueSize.inc();
@@ -324,22 +332,24 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
 
         public int process(boolean shouldForceWrite) throws IOException {
             forceWriteQueueSize.dec();
+
             if (isMarker) {
                 return 0;
             }
 
             try {
                 if (shouldForceWrite) {
-                    long startTime = MathUtils.nowInNano();
-                    this.logFile.forceWrite(false);
-                    journalSyncStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                    if (enableGroupForceWrites) {
+                        this.logFile.forceWrite(false);
+                    } else {
+                        this.logFile.syncRangeOrForceWrite(this.startFlushPosition,
+                            this.endFlushPosition - this.startFlushPosition);
+                    }
                 }
-                lastLogMark.setCurLogMark(this.logId, this.lastFlushedPosition);
+                lastLogMark.setCurLogMark(this.logId, this.endFlushPosition);
 
                 // Notify the waiters that the force write succeeded
-                for (QueueEntry e : this.forceWriteWaiters) {
-                    cbThreadPool.submit(e);
-                }
+                callback();
 
                 return this.forceWriteWaiters.size();
             }
@@ -348,6 +358,16 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
             }
         }
 
+        void callback() {
+            for (QueueEntry e : this.forceWriteWaiters) {
+                if (null != e.ctx) {
+                    cbThreadPool.submitOrdered(e.ctx, e);
+                } else {
+                    cbThreadPool.submit(e);
+                }
+            }
+        }
+
         public void closeFileIfNecessary() {
             // Close if shouldClose is set
             if (shouldClose) {
@@ -374,14 +394,12 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
         // This holds the queue entries that should be notified after a
         // successful force write
         Thread threadToNotifyOnEx;
-        // should we group force writes
-        private final boolean enableGroupForceWrites;
         // make flush interval as a parameter
-        public ForceWriteThread(Thread threadToNotifyOnEx, boolean enableGroupForceWrites) {
+        public ForceWriteThread(Thread threadToNotifyOnEx) {
             super("ForceWriteThread");
             this.threadToNotifyOnEx = threadToNotifyOnEx;
-            this.enableGroupForceWrites = enableGroupForceWrites;
         }
+
         @Override
         public void run() {
             LOG.info("ForceWrite Thread started");
@@ -394,25 +412,23 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
 
                     // Force write the file and then notify the write completions
                     //
-                    if (!req.isMarker) {
-                        if (shouldForceWrite) {
-                            // if we are going to force write, any request that is already in the
-                            // queue will benefit from this force write - post a marker prior to issuing
-                            // the flush so until this marker is encountered we can skip the force write
-                            if (enableGroupForceWrites) {
-                                forceWriteRequests.put(new ForceWriteRequest(req.logFile, 0, 0, null, false, true));
-                            }
+                    if (!req.isMarker && shouldForceWrite) {
+                        // if we are going to force write, any request that is already in the
+                        // queue will benefit from this force write - post a marker prior to issuing
+                        // the flush so until this marker is encountered we can skip the force write
+                        if (enableGroupForceWrites) {
+                            forceWriteRequests.put(new ForceWriteRequest(req.logFile, 0, 0, 0, null, false, true));
+                        }
 
-                            // If we are about to issue a write, record the number of requests in
-                            // the last force write and then reset the counter so we can accumulate
-                            // requests in the write we are about to issue
-                            if (numReqInLastForceWrite > 0) {
-                                forceWriteGroupingCountStats.registerSuccessfulValue(numReqInLastForceWrite);
-                                numReqInLastForceWrite = 0;
-                            }
+                        // If we are about to issue a write, record the number of requests in
+                        // the last force write and then reset the counter so we can accumulate
+                        // requests in the write we are about to issue
+                        if (numReqInLastForceWrite > 0) {
+                            forceWriteGroupingCountStats.registerSuccessfulValue(numReqInLastForceWrite);
+                            numReqInLastForceWrite = 0;
                         }
-                        numReqInLastForceWrite += req.process(shouldForceWrite);
                     }
+                    numReqInLastForceWrite += req.process(shouldForceWrite);
 
                     if (enableGroupForceWrites &&
                         // if its a marker we should switch back to flushing
@@ -493,6 +509,8 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
     final File journalDirectory;
     final ServerConfiguration conf;
     final ForceWriteThread forceWriteThread;
+    // should we group force writes
+    private final boolean enableGroupForceWrites;
     // Time after which we will stop grouping and issue the flush
     private final long maxGroupWaitInNanos;
     // Threshold after which we flush any buffered journal entries
@@ -503,13 +521,17 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
     private final boolean flushWhenQueueEmpty;
     // should we hint the filesystem to remove pages from cache after force write
     private final boolean removePagesFromCache;
+    // journal align size
+    private final int journalAlignmentSize;
+    // journal format version to write
+    private final int journalFormatVersionToWrite;
 
     private final LastLogMark lastLogMark = new LastLogMark(0, 0);
 
     /**
      * The thread pool used to handle callback.
      */
-    private final ExecutorService cbThreadPool;
+    private final OrderedSafeExecutor cbThreadPool;
 
     // journal entry queue to commit
     final LinkedBlockingQueue<QueueEntry> queue = new LinkedBlockingQueue<QueueEntry>();
@@ -548,12 +570,19 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
         this.journalPreAllocSize = conf.getJournalPreAllocSizeMB() * MB;
         this.journalWriteBufferSize = conf.getJournalWriteBufferSizeKB() * KB;
         this.maxBackupJournals = conf.getMaxBackupJournals();
-        this.forceWriteThread = new ForceWriteThread(this, conf.getJournalAdaptiveGroupWrites());
+        this.enableGroupForceWrites = conf.getJournalAdaptiveGroupWrites();
+        this.forceWriteThread = new ForceWriteThread(this);
         this.maxGroupWaitInNanos = TimeUnit.MILLISECONDS.toNanos(conf.getJournalMaxGroupWaitMSec());
         this.bufferedWritesThreshold = conf.getJournalBufferedWritesThreshold();
         this.bufferedEntriesThreshold = conf.getJournalBufferedEntriesThreshold();
-        this.cbThreadPool = Executors.newFixedThreadPool(conf.getNumJournalCallbackThreads(),
-                                                         new DaemonThreadFactory());
+        this.journalAlignmentSize = conf.getJournalAlignmentSize();
+        this.journalFormatVersionToWrite = conf.getJournalFormatVersionToWrite();
+        this.cbThreadPool = OrderedSafeExecutor.newBuilder()
+                .name("BookieJournal")
+                .numThreads(conf.getNumJournalCallbackThreads())
+                .statsLogger(Stats.get().getStatsLogger("journal"))
+                .threadFactory(new DaemonThreadFactory())
+                .build();
 
         // Unless there is a cap on the max wait (which requires group force writes)
         // we cannot skip flushing for queue empty
@@ -646,9 +675,11 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
         throws IOException {
         JournalChannel recLog;
         if (journalPos <= 0) {
-            recLog = new JournalChannel(journalDirectory, journalId, journalPreAllocSize, journalWriteBufferSize);
+            recLog = new JournalChannel(journalDirectory, journalId,
+                    journalPreAllocSize, journalWriteBufferSize, statsLogger);
         } else {
-            recLog = new JournalChannel(journalDirectory, journalId, journalPreAllocSize, journalWriteBufferSize, journalPos);
+            recLog = new JournalChannel(journalDirectory, journalId,
+                    journalPreAllocSize, journalWriteBufferSize, journalPos, statsLogger);
         }
         int journalVersion = recLog.getFormatVersion();
         try {
@@ -701,6 +732,8 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
                 if (!isPaddingRecord) {
                     scanner.process(journalVersion, offset, recBuff);
                 }
+                // update last log mark during replaying
+                lastLogMark.setCurLogMark(journalId, offset);
             }
         } finally {
             recLog.close();
@@ -785,10 +818,11 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
     public void run() {
         LinkedList<QueueEntry> toFlush = new LinkedList<QueueEntry>();
         ByteBuffer lenBuff = ByteBuffer.allocate(4);
-        ByteBuffer paddingBuff = ByteBuffer.allocate(2 * conf.getJournalAlignmentSize());
+        ByteBuffer paddingBuff = ByteBuffer.allocate(2 * journalAlignmentSize);
         ZeroBuffer.put(paddingBuff);
         JournalChannel logFile = null;
         forceWriteThread.start();
+        Stopwatch journalAllocationWatcher = new Stopwatch();
         Stopwatch journalCreationWatcher = new Stopwatch();
         Stopwatch journalFlushWatcher = new Stopwatch();
         long batchSize = 0;
@@ -799,7 +833,7 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
             // http://docs.oracle.com/javase/1.5.0/docs/api/java/lang/System.html#nanoTime%28%29
             long logId = journalIds.isEmpty() ? System.currentTimeMillis() : journalIds.get(journalIds.size() - 1);
             BufferedChannel bc = null;
-            long lastFlushPosition = 0;
+            long lastFlushPosition = 0L;
             boolean groupWhenTimeout = false;
 
             long dequeueStartTime = 0L;
@@ -815,15 +849,16 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
                                         logId,
                                         journalPreAllocSize,
                                         journalWriteBufferSize,
-                                        conf.getJournalAlignmentSize(),
+                                        journalAlignmentSize,
                                         removePagesFromCache,
-                                        conf.getJournalFormatVersionToWrite());
+                                        journalFormatVersionToWrite,
+                                        statsLogger);
                     journalCreationStats.registerSuccessfulEvent(
                             journalCreationWatcher.stop().elapsedTime(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
 
                     bc = logFile.getBufferedChannel();
 
-                    lastFlushPosition = bc.position();
+                    lastFlushPosition = 0;
                 }
 
                 if (qe == null) {
@@ -880,14 +915,21 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
 
                         // toFlush is non null and not empty so should be safe to access getFirst
                         if (shouldFlush) {
-                            if (conf.getJournalFormatVersionToWrite() >= JournalChannel.V5) {
-                                writePaddingBytes(logFile, paddingBuff, conf.getJournalAlignmentSize());
-                            }
+                            long prevFlushPosition = lastFlushPosition;
+
                             journalFlushWatcher.reset().start();
+                            if (journalFormatVersionToWrite >= JournalChannel.V5) {
+                                writePaddingBytes(logFile, paddingBuff, journalAlignmentSize);
+                            }
                             bc.flush(false);
                             lastFlushPosition = bc.position();
-                            journalFlushStats.registerSuccessfulEvent(
-                                    journalFlushWatcher.stop().elapsedTime(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
+
+                            // start sync the range
+                            if (!enableGroupForceWrites) {
+                                logFile.startSyncRange(prevFlushPosition, lastFlushPosition);
+                            }
+                            journalFlushLatencyStats.registerSuccessfulEvent(
+                                    journalFlushWatcher.stop().elapsed(TimeUnit.MICROSECONDS));
 
                             // Trace the lifetime of entries through persistence
                             if (LOG.isDebugEnabled()) {
@@ -899,7 +941,8 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
                             forceWriteBatchEntriesStats.registerSuccessfulValue(toFlush.size());
                             forceWriteBatchBytesStats.registerSuccessfulValue(batchSize);
 
-                            forceWriteRequests.put(new ForceWriteRequest(logFile, logId, lastFlushPosition, toFlush, (lastFlushPosition > maxJournalSize), false));
+                            forceWriteRequests.put(new ForceWriteRequest(logFile, logId, prevFlushPosition,
+                                    lastFlushPosition, toFlush, (lastFlushPosition > maxJournalSize), false));
                             toFlush = new LinkedList<QueueEntry>();
                             batchSize = 0L;
                             // check whether journal file is over file limit
@@ -936,8 +979,13 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
                 // we should be doing the following, but then we run out of
                 // direct byte buffers
                 // logFile.write(new ByteBuffer[] { lenBuff, qe.entry });
-                bc.write(lenBuff);
-                bc.write(qe.entry);
+                int flushes = 0;
+                flushes += bc.write(lenBuff);
+                flushes += bc.write(qe.entry);
+
+                journalMemAddFlushTimesStats.registerSuccessfulEvent(flushes);
+                journalMemAddLatencyStats.registerSuccessfulEvent(
+                        MathUtils.elapsedMicroSec(qe.enqueueTime));
 
                 toFlush.add(qe);
                 qe = null;
@@ -970,11 +1018,10 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
             LOG.info("Shutting down Journal");
             forceWriteThread.shutdown();
             cbThreadPool.shutdown();
-            if (!cbThreadPool.awaitTermination(5, TimeUnit.SECONDS)) {
+            ;
+            if (!cbThreadPool.forceShutdown(5, TimeUnit.SECONDS)) {
                 LOG.warn("Couldn't shutdown journal callback thread gracefully. Forcing");
             }
-            cbThreadPool.shutdownNow();
-
             running = false;
             this.interrupt();
             this.join();

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/d9802962/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
index ad46e5c..e3077e1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
@@ -28,13 +28,23 @@ import java.io.IOException;
 import java.nio.channels.FileChannel;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
-
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Stopwatch;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.NativeIO;
 import org.apache.bookkeeper.util.ZeroBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.*;
+import static org.apache.bookkeeper.util.NativeIO.*;
 
 /**
  * Simple wrapper around FileChannel to add versioning
@@ -81,28 +91,36 @@ class JournalChannel implements Closeable {
     // The position of the file channel's last drop position
     private long lastDropPosition = 0L;
 
+    // Stats
+    private final OpStatsLogger journalPreallocationStats;
+    private final Counter journalForceWriteCounter;
+    private final OpStatsLogger journalForceWriteStats;
+
     // Mostly used by tests
     JournalChannel(File journalDirectory, long logId) throws IOException {
-        this(journalDirectory, logId, 4 * 1024 * 1024, 65536, START_OF_FILE);
+        this(journalDirectory, logId, 4 * 1024 * 1024, 65536, START_OF_FILE, NullStatsLogger.INSTANCE);
     }
 
     // Open journal for scanning starting from the first record in journal.
-    JournalChannel(File journalDirectory, long logId, long preAllocSize, int writeBufferSize) throws IOException {
-        this(journalDirectory, logId, preAllocSize, writeBufferSize, START_OF_FILE);
+    JournalChannel(File journalDirectory, long logId, long preAllocSize, int writeBufferSize, StatsLogger statsLogger)
+            throws IOException {
+        this(journalDirectory, logId, preAllocSize, writeBufferSize, START_OF_FILE, statsLogger);
     }
 
     // Open journal for scanning starting from given position.
     JournalChannel(File journalDirectory, long logId,
-                   long preAllocSize, int writeBufferSize, long position) throws IOException {
-         this(journalDirectory, logId, preAllocSize, writeBufferSize, SECTOR_SIZE, position, false, V5);
+                   long preAllocSize, int writeBufferSize, long position, StatsLogger statsLogger)
+            throws IOException {
+         this(journalDirectory, logId, preAllocSize, writeBufferSize, SECTOR_SIZE, position, false, V5, statsLogger);
     }
 
     // Open journal to write
     JournalChannel(File journalDirectory, long logId,
                    long preAllocSize, int writeBufferSize, int journalAlignSize,
-                   boolean fRemoveFromPageCache, int formatVersionToWrite) throws IOException {
+                   boolean fRemoveFromPageCache, int formatVersionToWrite,
+                   StatsLogger statsLogger) throws IOException {
         this(journalDirectory, logId, preAllocSize, writeBufferSize, journalAlignSize,
-             START_OF_FILE, fRemoveFromPageCache, formatVersionToWrite);
+             START_OF_FILE, fRemoveFromPageCache, formatVersionToWrite, statsLogger);
     }
 
     /**
@@ -124,12 +142,20 @@ class JournalChannel implements Closeable {
      *          whether to remove cached pages from page cache.
      * @param formatVersionToWrite
      *          format version to write
+     * @param statsLogger
+     *          stats logger to record stats
      * @throws IOException
      */
-    private JournalChannel(File journalDirectory, long logId,
-                           long preAllocSize, int writeBufferSize, int journalAlignSize,
-                           long position, boolean fRemoveFromPageCache,
-                           int formatVersionToWrite) throws IOException {
+    private JournalChannel(File journalDirectory,
+                           long logId,
+                           long preAllocSize,
+                           int writeBufferSize,
+                           int journalAlignSize,
+                           long position,
+                           boolean fRemoveFromPageCache,
+                           int formatVersionToWrite,
+                           StatsLogger statsLogger)
+            throws IOException {
         this.journalAlignSize = journalAlignSize;
         this.zeros = ByteBuffer.allocate(journalAlignSize);
         this.preAllocSize = preAllocSize - preAllocSize % journalAlignSize;
@@ -149,9 +175,13 @@ class JournalChannel implements Closeable {
                         + " suddenly appeared, is another bookie process running?");
             }
             randomAccessFile = new RandomAccessFile(fn, "rw");
+            fd = NativeIO.getSysFileDescriptor(randomAccessFile.getFD());
             fc = randomAccessFile.getChannel();
             formatVersion = formatVersionToWrite;
 
+            // preallocate the space the header
+            preallocate();
+
             int headerSize = (V4 == formatVersion) ? VERSION_HEADER_SIZE : HEADER_SIZE;
             ByteBuffer bb = ByteBuffer.allocate(headerSize);
             ZeroBuffer.put(bb);
@@ -162,11 +192,12 @@ class JournalChannel implements Closeable {
             fc.write(bb);
 
             bc = new BufferedChannel(fc, writeBufferSize);
-            forceWrite(true);
-            nextPrealloc = this.preAllocSize;
-            fc.write(zeros, nextPrealloc - journalAlignSize);
+
+            // sync the file
+            // syncRangeOrForceWrite(0, HEADER_SIZE);
         } else {  // open an existing file
             randomAccessFile = new RandomAccessFile(fn, "r");
+            fd = NativeIO.getSysFileDescriptor(randomAccessFile.getFD());
             fc = randomAccessFile.getChannel();
             bc = null; // readonly
 
@@ -215,7 +246,13 @@ class JournalChannel implements Closeable {
                 LOG.error("Bookie journal file can seek to position :", e);
             }
         }
-        this.fd = NativeIO.getSysFileDescriptor(randomAccessFile.getFD());
+
+        // Stats
+        this.journalForceWriteCounter = statsLogger.getCounter(JOURNAL_NUM_FORCE_WRITES);
+        this.journalForceWriteStats = statsLogger.getOpStatsLogger(JOURNAL_FORCE_WRITE_LATENCY);
+        this.journalPreallocationStats = statsLogger.getOpStatsLogger(JOURNAL_PREALLOCATION);
+
+        LOG.info("Opened journal {} : fd {}", fn, fd);
     }
 
     int getFormatVersion() {
@@ -229,14 +266,33 @@ class JournalChannel implements Closeable {
         return bc;
     }
 
-    void preAllocIfNeeded(long size) throws IOException {
-        if (bc.position() + size > nextPrealloc) {
-            nextPrealloc += preAllocSize;
+    private void preallocate() throws IOException {
+        long prevPrealloc = nextPrealloc;
+        nextPrealloc = prevPrealloc + preAllocSize;
+        if (!NativeIO.fallocateIfPossible(fd, prevPrealloc, preAllocSize)) {
             zeros.clear();
             fc.write(zeros, nextPrealloc - journalAlignSize);
         }
     }
 
+    void preAllocIfNeeded(long size) throws IOException {
+        preAllocIfNeeded(size, null);
+    }
+
+    void preAllocIfNeeded(long size, Stopwatch stopwatch) throws IOException {
+        if (bc.position() + size > nextPrealloc) {
+            if (null != stopwatch) {
+                stopwatch.reset().start();
+            }
+            preallocate();
+            if (null != stopwatch) {
+                journalPreallocationStats.registerSuccessfulEvent(
+                        stopwatch.stop().elapsedTime(TimeUnit.MICROSECONDS),
+                        TimeUnit.MICROSECONDS);
+            }
+        }
+    }
+
     int read(ByteBuffer dst)
             throws IOException {
         return fc.read(dst);
@@ -246,11 +302,33 @@ class JournalChannel implements Closeable {
         fc.close();
     }
 
+    public void startSyncRange(long offset, long bytes) throws IOException {
+        NativeIO.syncFileRangeIfPossible(fd, offset, bytes, SYNC_FILE_RANGE_WRITE);
+    }
+
+    public boolean syncRangeIfPossible(long offset, long bytes) throws IOException {
+        if (NativeIO.syncFileRangeIfPossible(fd, offset, bytes,
+                SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE | SYNC_FILE_RANGE_WAIT_AFTER)) {
+            removeFromPageCacheIfPossible(offset + bytes);
+            return false;
+        } else {
+            return false;
+        }
+    }
+
     public void forceWrite(boolean forceMetadata) throws IOException {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Journal ForceWrite");
         }
-        long newForceWritePosition = bc.forceWrite(forceMetadata);
+        long startTimeNanos = MathUtils.nowInNano();
+        forceWriteImpl(forceMetadata);
+        // collect stats
+        journalForceWriteCounter.inc();
+        journalForceWriteStats.registerSuccessfulEvent(
+                MathUtils.elapsedMicroSec(startTimeNanos), TimeUnit.MICROSECONDS);
+    }
+
+    private void removeFromPageCacheIfPossible(long offset) {
         //
         // For POSIX_FADV_DONTNEED, we want to drop from the beginning
         // of the file to a position prior to the current position.
@@ -265,11 +343,28 @@ class JournalChannel implements Closeable {
         // lastDropPosition     newDropPos             lastForceWritePosition
         //
         if (fRemoveFromPageCache) {
-            long newDropPos = newForceWritePosition - CACHE_DROP_LAG_BYTES;
+            long newDropPos = offset - CACHE_DROP_LAG_BYTES;
             if (lastDropPosition < newDropPos) {
                 NativeIO.bestEffortRemoveFromPageCache(fd, lastDropPosition, newDropPos - lastDropPosition);
             }
             this.lastDropPosition = newDropPos;
         }
     }
+
+    private void forceWriteImpl(boolean forceMetadata) throws IOException {
+        long newForceWritePosition = bc.forceWrite(forceMetadata);
+        removeFromPageCacheIfPossible(newForceWritePosition);
+    }
+
+    public void syncRangeOrForceWrite(long offset, long bytes) throws IOException {
+        long startTimeNanos = MathUtils.nowInNano();
+        if (!syncRangeIfPossible(offset, bytes)) {
+            forceWriteImpl(false);
+        }
+        // collect stats
+        journalForceWriteCounter.inc();
+        journalForceWriteStats.registerSuccessfulEvent(
+                MathUtils.elapsedMicroSec(startTimeNanos),
+                TimeUnit.MICROSECONDS);
+    }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/d9802962/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Errno.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Errno.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Errno.java
new file mode 100644
index 0000000..e5d8ae8
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Errno.java
@@ -0,0 +1,115 @@
+package org.apache.bookkeeper.util;
+
+import com.sun.jna.Library;
+import com.sun.jna.Native;
+
+public class Errno {
+
+    private static InterfaceDelegate delegate = (InterfaceDelegate) Native.loadLibrary("c",
+                                                                                       InterfaceDelegate.class);
+
+    /**
+     * The routine perror() produces a message on the standard error output,
+     * describing the last error encountered during a call to a system or
+     * library function. First (if s is not NULL and *s is not a null byte
+     * ('\0')) the argument string s is printed, followed by a colon and a
+     * blank. Then the message and a new-line.
+     *
+     * To be of most use, the argument string should include the name of the
+     * function that incurred the error. The error number is taken from the
+     * external variable errno, which is set when errors occur but not cleared
+     * when non-erroneous calls are made.
+     *
+     * The global error list sys_errlist[] indexed by errno can be used to
+     * obtain the error message without the newline. The largest message number
+     * provided in the table is sys_nerr -1. Be careful when directly accessing
+     * this list because new error values may not have been added to
+     * sys_errlist[].
+     *
+     * When a system call fails, it usually returns -1 and sets the variable
+     * errno to a value describing what went wrong. (These values can be found
+     * in <errno.h>.) Many library functions do likewise. The function perror()
+     * serves to translate this error code into human-readable form. Note that
+     * errno is undefined after a successful library call: this call may well
+     * change this variable, even though it succeeds, for example because it
+     * internally used some other library function that failed. Thus, if a
+     * failing call is not immediately followed by a call to perror(), the value
+     * of errno should be saved.
+     */
+    public static int perror(String s) {
+        return delegate.perror(s);
+    }
+
+    /**
+     * The strerror() function returns a string describing the error code passed
+     * in the argument errnum, possibly using the LC_MESSAGES part of the
+     * current locale to select the appropriate language. This string must not
+     * be modified by the application, but may be modified by a subsequent call
+     * to perror() or strerror(). No library function will modify this string.
+     *
+     * The strerror_r() function is similar to strerror(), but is thread safe.
+     * This function is available in two versions: an XSI-compliant version
+     * specified in POSIX.1-2001, and a GNU-specific version (available since
+     * glibc 2.0). If _XOPEN_SOURCE is defined with the value 600, then the
+     * XSI-compliant version is provided, otherwise the GNU-specific version is
+     * provided.
+     *
+     * The XSI-compliant strerror_r() is preferred for portable applications. It
+     * returns the error string in the user-supplied buffer buf of length
+     * buflen.
+     *
+     * The GNU-specific strerror_r() returns a pointer to a string containing
+     * the error message. This may be either a pointer to a string that the
+     * function stores in buf, or a pointer to some (immutable) static string
+     * (in which case buf is unused). If the function stores a string in buf,
+     * then at most buflen bytes are stored (the string may be truncated if
+     * buflen is too small) and the string always includes a terminating null
+     * byte.
+     *
+     */
+    public static String strerror(int errnum) {
+        return delegate.strerror(errnum);
+    }
+
+    public static String strerror() {
+        return strerror(errno());
+    }
+
+    /**
+     * The <errno.h> header file defines the integer variable errno, which is
+     * set by system calls and some library functions in the event of an error
+     * to indicate what went wrong. Its value is significant only when the call
+     * returned an error (usually -1), and a function that does succeed is
+     * allowed to change errno.
+     *
+     * Sometimes, when -1 is also a valid successful return value one has to
+     * zero errno before the call in order to detect possible errors.
+     *
+     * errno is defined by the ISO C standard to be a modifiable lvalue of type
+     * int, and must not be explicitly declared; errno may be a macro. errno is
+     * thread-local; setting it in one thread does not affect its value in any
+     * other thread.
+     *
+     * Valid error numbers are all non-zero; errno is never set to zero by any
+     * library function. All the error names specified by POSIX.1 must have
+     * distinct values, with the exception of EAGAIN and EWOULDBLOCK, which may
+     * be the same.
+     *
+     * Below is a list of the symbolic error names that are defined on Linux.
+     * Some of these are marked POSIX.1, indicating that the name is defined by
+     * POSIX.1-2001, or C99, indicating that the name is defined by C99.
+     *
+     */
+    public static int errno() {
+        return Native.getLastError();
+    }
+
+    interface InterfaceDelegate extends Library {
+
+        int perror(String s);
+
+        String strerror(int errnum);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/d9802962/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NativeIO.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NativeIO.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NativeIO.java
index 2448842..9eb3a68 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NativeIO.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NativeIO.java
@@ -20,34 +20,56 @@ package org.apache.bookkeeper.util;
 
 import java.lang.reflect.Field;
 import java.io.FileDescriptor;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import com.sun.jna.LastErrorException;
-import com.sun.jna.Native;
 
 public final class NativeIO {
     private static final Logger LOG = LoggerFactory.getLogger(NativeIO.class);
 
     private static final int POSIX_FADV_DONTNEED = 4; /* fadvise.h */
 
+    /**
+     *  Wait upon writeout of all pages in the range before performing the write.
+     */
+    public static final int SYNC_FILE_RANGE_WAIT_BEFORE = 1;
+    /**
+     * Initiate writeout of all those dirty pages in the range which are not presently
+     * under writeback.
+     */
+    public static final int SYNC_FILE_RANGE_WRITE = 2;
+    /**
+     * Wait upon writeout of all pages in the range after performing the write.
+     */
+    public static final int SYNC_FILE_RANGE_WAIT_AFTER = 4;
+
+    private static final int FALLOC_FL_KEEP_SIZE = 1;
+
     private static boolean initialized = false;
     private static boolean fadvisePossible = true;
+    private static boolean syncFileRangePossible = true;
+    private static boolean sysFallocatePossible = true;
+    private static boolean posixFallocatePossible = true;
 
     static {
         try {
-            Native.register("c");
+            LOG.info("Loading bookkeeper native library.");
+            System.loadLibrary("bookkeeper");
             initialized = true;
-        } catch (NoClassDefFoundError e) {
-            LOG.info("JNA not found. Native methods will be disabled.");
-        } catch (UnsatisfiedLinkError e) {
-            LOG.info("Unable to link C library. Native methods will be disabled.");
-        } catch (NoSuchMethodError e) {
-            LOG.warn("Obsolete version of JNA present; unable to register C library");
+            LOG.info("Loaded bookkeeper native library. Enabled Native IO.");
+        } catch (Throwable t) {
+            LOG.info("Unable to load bookkeeper native library. Native methods will be disabled : ", t);
         }
     }
 
     // fadvice
-    public static native int posix_fadvise(int fd, long offset, long len, int flag) throws LastErrorException;
+    public static native int posix_fadvise(int fd, long offset, long len, int flag);
+    // posix_fallocate
+    public static native int posix_fallocate(int fd, long offset, long len);
+    // fallocate
+    public static native int fallocate(int fd, int mode, long offset, long len);
+    // sync_file_range(2)
+    public static native int sync_file_range(int fd, long offset, long len, int flags);
 
     private NativeIO() {}
 
@@ -66,6 +88,7 @@ public final class NativeIO {
 
         return field;
     }
+
     /**
      * Get system file descriptor (int) from FileDescriptor object.
      * @param descriptor - FileDescriptor object to get fd from
@@ -82,6 +105,92 @@ public final class NativeIO {
         return -1;
     }
 
+    public static boolean fallocateIfPossible(int fd, long offset, long nbytes) {
+        if (!initialized || fd < 0) {
+            return false;
+        }
+        boolean allocated = false;
+        if (sysFallocatePossible) {
+            allocated = sysFallocateIfPossible(fd, offset, nbytes);
+        }
+        if (!allocated && posixFallocatePossible) {
+            allocated = posixFallocateIfPossible(fd, offset, nbytes);
+        }
+        return allocated;
+    }
+
+    private static boolean sysFallocateIfPossible(int fd, long offset, long nbytes) {
+        try {
+            int rc = fallocate(fd, FALLOC_FL_KEEP_SIZE, offset, nbytes);
+            if (rc != 0) {
+                LOG.error("Failed on sys fallocate file descriptor {}, offset {}, bytes {}, rc {} : {}",
+                        new Object[] { fd, offset, nbytes, rc, Errno.strerror() });
+                return false;
+            }
+        } catch (UnsupportedOperationException uoe) {
+            LOG.warn("sys fallocate isn't supported : ", uoe);
+            sysFallocatePossible = false;
+        }  catch (UnsatisfiedLinkError nle) {
+            LOG.warn("Unsatisfied Link error: sys fallocate failed on file descriptor {}, offset {}, bytes {} : ",
+                    new Object[] { fd, offset, nbytes, nle });
+            sysFallocatePossible = false;
+        } catch (Exception e) {
+            LOG.error("Unknown exception: sys fallocate failed on file descriptor {}, offset {}, bytes {} : ",
+                    new Object[] { fd, offset, nbytes, e });
+            return false;
+        }
+        return sysFallocatePossible;
+    }
+
+    private static boolean posixFallocateIfPossible(int fd, long offset, long nbytes) {
+        try {
+            int rc = posix_fallocate(fd, offset, nbytes);
+            if (rc != 0) {
+                LOG.error("Failed on posix_fallocate file descriptor {}, offset {}, bytes {}, rc {} : {}",
+                        new Object[] { fd, offset, nbytes, rc, Errno.strerror() });
+                return false;
+            }
+        } catch (UnsupportedOperationException uoe) {
+            LOG.warn("posix_fallocate isn't supported : ", uoe);
+            posixFallocatePossible = false;
+        }  catch (UnsatisfiedLinkError nle) {
+            LOG.warn("Unsatisfied Link error: posix_fallocate failed on file descriptor {}, offset {}, bytes {} : ",
+                    new Object[] { fd, offset, nbytes, nle });
+            posixFallocatePossible = false;
+        } catch (Exception e) {
+            LOG.error("Unknown exception: posix_fallocate failed on file descriptor {}, offset {}, bytes {} : ",
+                    new Object[] { fd, offset, nbytes, e });
+            return false;
+        }
+        return posixFallocatePossible;
+    }
+
+    public static boolean syncFileRangeIfPossible(int fd, long offset, long nbytes, int flags) {
+        if (!initialized || !syncFileRangePossible || fd < 0) {
+            return false;
+        }
+        try {
+            int rc = sync_file_range(fd, offset, nbytes, flags);
+            if (rc != 0) {
+                LOG.error("Failed on syncing file descriptor {}, offset {}, bytes {}, rc {} : {}",
+                        new Object[] { fd, offset, nbytes, rc, Errno.strerror() });
+                return false;
+            }
+        } catch (UnsupportedOperationException uoe) {
+            LOG.warn("sync_file_range isn't supported : ", uoe);
+            syncFileRangePossible = false;
+        }  catch (UnsatisfiedLinkError nle) {
+            LOG.warn("Unsatisfied Link error: sync_file_range failed on file descriptor {}, offset {}, bytes {} : ",
+                    new Object[] { fd, offset, nbytes, nle });
+            syncFileRangePossible = false;
+        } catch (Exception e) {
+            LOG.error("Unknown exception: sync_file_range failed on file descriptor {}, offset {}, bytes {} : ",
+                    new Object[] { fd, offset, nbytes, e });
+            return false;
+        }
+        return syncFileRangePossible;
+    }
+
     /**
      * Remove pages from the file system page cache when they wont
      * be accessed again
@@ -89,16 +198,22 @@ public final class NativeIO {
      * @param fd     The file descriptor of the source file.
      * @param offset The offset within the file.
      * @param len    The length to be flushed.
-     *
-     * @throws nothing => Best effort
      */
-
     public static void bestEffortRemoveFromPageCache(int fd, long offset, long len) {
+        posixFadviseIfPossible(fd, offset, len, POSIX_FADV_DONTNEED);
+    }
+
+    public static boolean posixFadviseIfPossible(int fd, long offset, long len, int flags) {
         if (!initialized || !fadvisePossible || fd < 0) {
-            return;
+            return false;
         }
         try {
-            posix_fadvise(fd, offset, len, POSIX_FADV_DONTNEED);
+            int rc = posix_fadvise(fd, offset, len, flags);
+            if (rc != 0) {
+                LOG.error("Failed on posix_fadvise file descriptor {}, offset {}, bytes {}, flags {}, rc {} : {}",
+                        new Object[] { fd, offset, len, flags, rc, Errno.strerror() });
+                return false;
+            }
         } catch (UnsupportedOperationException uoe) {
             LOG.warn("posix_fadvise is not supported : ", uoe);
             fadvisePossible = false;
@@ -113,7 +228,9 @@ public final class NativeIO {
             // exception and forget
             LOG.warn("Unknown exception: posix_fadvise failed on file descriptor {}, offset {} : ",
                     new Object[] { fd, offset, e });
+            return false;
         }
+        return fadvisePossible;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/d9802962/bookkeeper-server/src/main/native/src/org/apache/bookkeeper/util/NativeIO.c
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/native/src/org/apache/bookkeeper/util/NativeIO.c b/bookkeeper-server/src/main/native/src/org/apache/bookkeeper/util/NativeIO.c
new file mode 100644
index 0000000..b93bde4
--- /dev/null
+++ b/bookkeeper-server/src/main/native/src/org/apache/bookkeeper/util/NativeIO.c
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <jni.h>
+
+#include <errno.h>
+#include <fcntl.h>
+#include <sys/syscall.h>
+#include <sys/types.h>
+#include <asm-x86_64/unistd.h>
+#include "config.h"
+
+#if defined(HAVE_SYNC_FILE_RANGE)
+#  define my_sync_file_range sync_file_range
+#elif defined(__NR_sync_file_range)
+// RHEL 5 kernels have sync_file_range support, but the glibc
+// included does not have the library function. We can
+// still call it directly, and if it's not supported by the
+// kernel, we'd get ENOSYS. See RedHat Bugzilla #518581
+static int manual_sync_file_range (int fd, __off64_t from, __off64_t to, unsigned int flags)
+{
+#ifdef __x86_64__
+  return syscall( __NR_sync_file_range, fd, from, to, flags);
+#else
+  return syscall (__NR_sync_file_range, fd,
+    __LONG_LONG_PAIR ((long) (from >> 32), (long) from),
+    __LONG_LONG_PAIR ((long) (to >> 32), (long) to),
+    flags);
+#endif
+}
+#define my_sync_file_range manual_sync_file_range
+#endif
+
+/**
+ * public static native void sync_file_range(
+ *   int fd, long offset, long len, int flags);
+ *
+ * The "00024" in the function name is an artifact of how JNI encodes
+ * special characters. U+0024 is '$'.
+ */
+JNIEXPORT jint JNICALL
+Java_org_apache_bookkeeper_util_NativeIO_sync_1file_1range(
+  JNIEnv *env, jclass clazz,
+  jint fd, jlong offset, jlong len, jint flags)
+{
+#ifndef my_sync_file_range
+  errno = ENOSYS;
+  return -1;
+#else
+  return my_sync_file_range(fd, (off_t)offset, (off_t)len, flags);
+#endif
+}
+
+#if defined(HAVE_FALLOCATE)
+#  define my_fallocate fallocate
+#elif defined(__NR_fallocate)
+static int manual_fallocate (int fd, int mode, __off64_t from, __off64_t to)
+{
+#ifdef __x86_64__
+  return syscall( __NR_fallocate, fd, mode, from, to);
+#else
+  return syscall (__NR_fallocate, fd, mode,
+    __LONG_LONG_PAIR ((long) (from >> 32), (long) from),
+    __LONG_LONG_PAIR ((long) (to >> 32), (long) to));
+#endif
+}
+#define my_fallocate manual_fallocate
+#endif
+
+JNIEXPORT jint JNICALL
+Java_org_apache_bookkeeper_util_NativeIO_fallocate(
+  JNIEnv *env, jclass clazz,
+  jint fd, jint mode, jlong offset, jlong len)
+{
+#ifndef my_fallocate
+  errno = ENOSYS;
+  return -1;
+#else
+  return my_fallocate(fd, mode, (off_t)offset, (off_t)len);
+#endif
+}
+
+JNIEXPORT jint JNICALL
+Java_org_apache_bookkeeper_util_NativeIO_posix_1fadvise(
+  JNIEnv *env, jclass clazz,
+  jint fd, jlong offset, jlong len, jint flags)
+{
+#ifndef HAVE_POSIX_FADVISE
+  errno = ENOSYS;
+  return -1;
+#else
+  return posix_fadvise(fd, (off_t)offset, (off_t)len, flags);
+#endif
+}
+
+JNIEXPORT jint JNICALL
+Java_org_apache_bookkeeper_util_NativeIO_posix_1fallocate(
+  JNIEnv *env, jclass clazz,
+  jint fd, jlong offset, jlong len)
+{
+#ifndef HAVE_POSIX_FALLOCATE
+  errno = ENOSYS;
+  return -1;
+#else
+  return posix_fallocate(fd, (off_t)offset, (off_t)len);
+#endif
+}


Mime
View raw message