activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [4/9] activemq-artemis git commit: ARTEMIS-163 First pass on the native AIO refactoring
Date Thu, 30 Jul 2015 09:14:26 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c
----------------------------------------------------------------------
diff --git a/artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c b/artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c
new file mode 100644
index 0000000..b6fcdd3
--- /dev/null
+++ b/artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c
@@ -0,0 +1,710 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _GNU_SOURCE
+// libaio, O_DIRECT and other things won't be available without this define
+#define _GNU_SOURCE
+#endif
+
+//#define DEBUG
+
+#include <jni.h>
+#include <unistd.h>
+#include <errno.h>
+#include <libaio.h>
+#include <sys/types.h>
+#include <sys/file.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <stdlib.h>
+#include <pthread.h>
+#include "org_apache_activemq_artemis_jlibaio_LibaioContext.h"
+#include "exception_helper.h"
+
+struct io_control {
+    io_context_t ioContext;
+    struct io_event * events;
+
+    jobject thisObject;
+
+    // This is used to make sure we don't return IOCB while something else is using them
+    // this is to guarantee the submits could be done concurrently with polling
+    pthread_mutex_t iocbLock;
+
+    pthread_mutex_t pollLock;
+
+    // a resuable pool of iocb
+    struct iocb ** iocb;
+    int queueSize;
+    int iocbPut;
+    int iocbGet;
+    int used;
+};
+
+jclass submitClass = NULL;
+jmethodID errorMethod = NULL;
+jmethodID doneMethod = NULL;
+jmethodID libaioContextDone = NULL;
+
+jclass libaioContextClass = NULL;
+jclass runtimeExceptionClass = NULL;
+jclass ioExceptionClass = NULL;
+
+// util methods
+void throwRuntimeException(JNIEnv* env, char* message) {
+    (*env)->ThrowNew(env, runtimeExceptionClass, message);
+}
+
+void throwRuntimeExceptionErrorNo(JNIEnv* env, char* message, int errorNumber) {
+    char* allocatedMessage = exceptionMessage(message, errorNumber);
+    (*env)->ThrowNew(env, runtimeExceptionClass, allocatedMessage);
+    free(allocatedMessage);
+}
+
+void throwIOException(JNIEnv* env, char* message) {
+    (*env)->ThrowNew(env, ioExceptionClass, message);
+}
+
+void throwIOExceptionErrorNo(JNIEnv* env, char* message, int errorNumber) {
+    char* allocatedMessage = exceptionMessage(message, errorNumber);
+    (*env)->ThrowNew(env, ioExceptionClass, allocatedMessage);
+    free(allocatedMessage);
+}
+
+void throwOutOfMemoryError(JNIEnv* env) {
+    jclass exceptionClass = (*env)->FindClass(env, "java/lang/OutOfMemoryError");
+    (*env)->ThrowNew(env, exceptionClass, "");
+}
+
+/** Notice: every usage of exceptionMessage needs to release the allocated memory for the sequence of char */
+char* exceptionMessage(char* msg, int error) {
+    if (error < 0) {
+        // some functions return negative values
+        // and it's hard to keep track of when to send -error and when not
+        // this will just take care when things are forgotten
+        // what would generate a proper error
+        error = error * -1;
+    }
+    //strerror is returning a constant, so no need to free anything coming from strerror
+    char* err = strerror(error);
+    char* result = malloc(strlen(msg) + strlen(err) + 1);
+    strcpy(result, msg);
+    strcat(result, err);
+    return result;
+}
+
+jint JNI_OnLoad(JavaVM* vm, void* reserved) {
+    JNIEnv* env;
+    if ((*vm)->GetEnv(vm, (void**) &env, JNI_VERSION_1_6) != JNI_OK) {
+        return JNI_ERR;
+    } else {
+        jclass localRuntimeExceptionClass = (*env)->FindClass(env, "java/lang/RuntimeException");
+        if (localRuntimeExceptionClass == NULL) {
+            // pending exception...
+            return JNI_ERR;
+        }
+        runtimeExceptionClass = (jclass) (*env)->NewGlobalRef(env, localRuntimeExceptionClass);
+        if (runtimeExceptionClass == NULL) {
+            // out-of-memory!
+            throwOutOfMemoryError(env);
+            return JNI_ERR;
+        }
+
+        jclass localIoExceptionClass = (*env)->FindClass(env, "java/io/IOException");
+        if (localIoExceptionClass == NULL) {
+            // pending exception...
+            return JNI_ERR;
+        }
+        ioExceptionClass = (jclass) (*env)->NewGlobalRef(env, localIoExceptionClass);
+        if (ioExceptionClass == NULL) {
+            // out-of-memory!
+            throwOutOfMemoryError(env);
+            return JNI_ERR;
+        }
+
+        submitClass = (*env)->FindClass(env, "org/apache/activemq/artemis/jlibaio/SubmitInfo");
+        if (submitClass == NULL) {
+           return JNI_ERR;
+        }
+
+        submitClass = (jclass)(*env)->NewGlobalRef(env, (jobject)submitClass);
+
+        errorMethod = (*env)->GetMethodID(env, submitClass, "onError", "(ILjava/lang/String;)V");
+        if (errorMethod == NULL) {
+           return JNI_ERR;
+        }
+        errorMethod = (jmethodID)(*env)->NewGlobalRef(env, (jobject)(errorMethod));
+
+        doneMethod = (*env)->GetMethodID(env, submitClass, "done", "()V");
+        if (doneMethod == NULL) {
+           return JNI_ERR;
+        }
+        doneMethod = (jmethodID)(*env)->NewGlobalRef(env, (jobject)(doneMethod));
+
+        libaioContextClass = (*env)->FindClass(env, "org/apache/activemq/artemis/jlibaio/LibaioContext");
+        if (libaioContextClass == NULL) {
+           return JNI_ERR;
+        }
+        libaioContextClass = (jclass)(*env)->NewGlobalRef(env, (jobject)libaioContextClass);
+
+        libaioContextDone = (*env)->GetMethodID(env, libaioContextClass, "done", "(Lorg/apache/activemq/artemis/jlibaio/SubmitInfo;)V");
+        if (libaioContextDone == NULL) {
+           return JNI_ERR;
+        }
+        libaioContextDone = (jmethodID)(*env)->NewGlobalRef(env, (jobject)libaioContextDone);
+
+        return JNI_VERSION_1_6;
+    }
+}
+
+void JNI_OnUnload(JavaVM* vm, void* reserved) {
+    JNIEnv* env;
+    if ((*vm)->GetEnv(vm, (void**) &env, JNI_VERSION_1_6) != JNI_OK) {
+        // Something is wrong but nothing we can do about this :(
+        return;
+    } else {
+        // delete global references so the GC can collect them
+        if (runtimeExceptionClass != NULL) {
+            (*env)->DeleteGlobalRef(env, runtimeExceptionClass);
+        }
+        if (ioExceptionClass != NULL) {
+            (*env)->DeleteGlobalRef(env, ioExceptionClass);
+        }
+
+        // Deleting global refs so their classes can be GCed
+        if (errorMethod != NULL) {
+            (*env)->DeleteGlobalRef(env, (jobject)errorMethod);
+        }
+
+        if (doneMethod != NULL) {
+            (*env)->DeleteGlobalRef(env, (jobject)doneMethod);
+        }
+
+        if (submitClass != NULL) {
+            (*env)->DeleteGlobalRef(env, (jobject)submitClass);
+        }
+
+        if (libaioContextClass != NULL) {
+            (*env)->DeleteGlobalRef(env, (jobject)libaioContextClass);
+        }
+
+        if (libaioContextDone != NULL) {
+            (*env)->DeleteGlobalRef(env, (jobject)libaioContextDone);
+        }
+    }
+}
+
+static inline struct io_control * getIOControl(JNIEnv* env, jobject pointer) {
+    struct io_control * ioControl = (struct io_control *) (*env)->GetDirectBufferAddress(env, pointer);
+    if (ioControl == NULL) {
+       throwRuntimeException(env, "Controller not initialized");
+    }
+    return ioControl;
+}
+
+/**
+ * remove an iocb from the pool of IOCBs. Returns null if full
+ */
+static inline struct iocb * getIOCB(struct io_control * control) {
+    struct iocb * iocb = 0;
+
+    pthread_mutex_lock(&(control->iocbLock));
+
+    #ifdef DEBUG
+       fprintf (stdout, "getIOCB::used=%d, queueSize=%d, get=%d, put=%d\n", control->used, control->queueSize, control->iocbGet, control->iocbPut);
+    #endif
+
+    if (control->used < control->queueSize) {
+        control->used++;
+        iocb = control->iocb[control->iocbGet++];
+
+        if (control->iocbGet >= control->queueSize) {
+           control->iocbGet = 0;
+        }
+    }
+
+    pthread_mutex_unlock(&(control->iocbLock));
+    return iocb;
+}
+
+/**
+ * Put an iocb back on the pool of IOCBs
+ */
+static inline void putIOCB(struct io_control * control, struct iocb * iocbBack) {
+    pthread_mutex_lock(&(control->iocbLock));
+
+    #ifdef DEBUG
+       fprintf (stdout, "putIOCB::used=%d, queueSize=%d, get=%d, put=%d\n", control->used, control->queueSize, control->iocbGet, control->iocbPut);
+    #endif
+
+    control->used--;
+    control->iocb[control->iocbPut++] = iocbBack;
+    if (control->iocbPut >= control->queueSize) {
+       control->iocbPut = 0;
+    }
+    pthread_mutex_unlock(&(control->iocbLock));
+}
+
+static inline void * getBuffer(JNIEnv* env, jobject pointer) {
+    return (*env)->GetDirectBufferAddress(env, pointer);
+}
+
+JNIEXPORT jboolean JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_lock
+  (JNIEnv * env, jclass  clazz, jint handle) {
+    return flock(handle, LOCK_EX | LOCK_NB) == 0;
+}
+
+/**
+ * Everything that is allocated here will be freed at deleteContext when the class is unloaded.
+ */
+JNIEXPORT jobject JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_newContext(JNIEnv* env, jobject thisObject, jint queueSize) {
+    io_context_t libaioContext;
+    int i = 0;
+
+    #ifdef DEBUG
+        fprintf (stdout, "Initializing context\n");
+    #endif
+
+    int res = io_queue_init(queueSize, &libaioContext);
+    if (res) {
+        // Error, so need to release whatever was done before
+        free(libaioContext);
+
+        throwRuntimeExceptionErrorNo(env, "Cannot initialize queue:", res);
+        return NULL;
+    }
+
+    struct iocb ** iocb = (struct iocb **)malloc((sizeof(struct iocb *) * (size_t)queueSize));
+    if (iocb == NULL) {
+       throwOutOfMemoryError(env);
+       return NULL;
+    }
+
+    for (i = 0; i < queueSize; i++) {
+       iocb[i] = (struct iocb *)malloc(sizeof(struct iocb));
+       if (iocb[i] == NULL) {
+           // it's unlikely this would happen at this point
+           // for that reason I'm not cleaning up individual IOCBs here
+           // we could increase support here with a cleanup of any previously allocated iocb
+           // But I'm afraid of adding not needed complexity here
+           throwOutOfMemoryError(env);
+           return NULL;
+       }
+    }
+
+    struct io_control * theControl = (struct io_control *) malloc(sizeof(struct io_control));
+    if (theControl == NULL) {
+        throwOutOfMemoryError(env);
+        return NULL;
+    }
+
+    res = pthread_mutex_init(&(theControl->iocbLock), 0);
+    if (res) {
+        free(theControl);
+        free(libaioContext);
+        throwRuntimeExceptionErrorNo(env, "Can't initialize mutext:", res);
+        return NULL;
+    }
+
+    res = pthread_mutex_init(&(theControl->pollLock), 0);
+    if (res) {
+        free(theControl);
+        free(libaioContext);
+        throwRuntimeExceptionErrorNo(env, "Can't initialize mutext:", res);
+        return NULL;
+    }
+
+    struct io_event * events = (struct io_event *)malloc(sizeof(struct io_event) * (size_t)queueSize);
+
+    theControl->ioContext = libaioContext;
+    theControl->events = events;
+    theControl->iocb = iocb;
+    theControl->queueSize = queueSize;
+    theControl->iocbPut = 0;
+    theControl->iocbGet = 0;
+    theControl->used = 0;
+    theControl->thisObject = (*env)->NewGlobalRef(env, thisObject);
+
+    return (*env)->NewDirectByteBuffer(env, theControl, sizeof(struct io_control));
+}
+
+JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_deleteContext(JNIEnv* env, jclass clazz, jobject contextPointer) {
+    int i;
+    struct io_control * theControl = getIOControl(env, contextPointer);
+    if (theControl == NULL) {
+      return;
+    }
+
+    io_queue_release(theControl->ioContext);
+
+    // to make sure the poll has finished
+    pthread_mutex_lock(&(theControl->pollLock));
+    pthread_mutex_unlock(&(theControl->pollLock));
+
+    pthread_mutex_destroy(&(theControl->pollLock));
+    pthread_mutex_destroy(&(theControl->iocbLock));
+
+    // Releasing each individual iocb
+    for (i = 0; i < theControl->queueSize; i++) {
+       free(theControl->iocb[i]);
+    }
+
+    (*env)->DeleteGlobalRef(env, theControl->thisObject);
+
+    free(theControl->iocb);
+    free(theControl->events);
+    free(theControl);
+}
+
+JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_close(JNIEnv* env, jclass clazz, jint fd) {
+   if (close(fd) < 0) {
+       throwIOExceptionErrorNo(env, "Error closing file:", errno);
+   }
+}
+
+JNIEXPORT int JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_open(JNIEnv* env, jclass clazz,
+                        jstring path, jboolean direct) {
+    const char* f_path = (*env)->GetStringUTFChars(env, path, 0);
+
+    int res;
+    if (direct) {
+      res = open(f_path, O_RDWR | O_CREAT | O_DIRECT, 0666);
+    } else {
+      res = open(f_path, O_RDWR | O_CREAT, 0666);
+    }
+
+    (*env)->ReleaseStringUTFChars(env, path, f_path);
+
+    if (res < 0) {
+       throwIOExceptionErrorNo(env, "Cannot open file:", errno);
+    }
+
+    return res;
+}
+
+static inline void submit(JNIEnv * env, struct io_control * theControl, struct iocb * iocb) {
+    int result = io_submit(theControl->ioContext, 1, &iocb);
+
+    if (result < 0) {
+        // Putting the Global Ref and IOCB back in case of a failure
+        (*env)->DeleteGlobalRef(env, (jobject)iocb->data);
+        putIOCB(theControl, iocb);
+
+        throwIOExceptionErrorNo(env, "Error while submitting IO: ", -result);
+    }
+
+    return;
+}
+
+JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_submitWrite
+  (JNIEnv * env, jclass clazz, jint fileHandle, jobject contextPointer, jlong position, jint size, jobject bufferWrite, jobject callback) {
+    struct io_control * theControl = getIOControl(env, contextPointer);
+    if (theControl == NULL) {
+      return;
+    }
+
+    #ifdef DEBUG
+       fprintf (stdout, "submitWrite position %ld, size %d\n", position, size);
+    #endif
+
+    struct iocb * iocb = getIOCB(theControl);
+
+    if (iocb == NULL) {
+        throwIOException(env, "Not enough space in libaio queue");
+        return;
+    }
+
+    io_prep_pwrite(iocb, fileHandle, getBuffer(env, bufferWrite), (size_t)size, position);
+
+    // The GlobalRef will be deleted when poll is called. this is done so
+    // the vm wouldn't crash if the Callback passed by the user is GCed between submission
+    // and callback.
+    // also as the real intention is to hold the reference until the life cycle is complete
+    iocb->data = (void *) (*env)->NewGlobalRef(env, callback);
+
+    return submit(env, theControl, iocb);
+}
+
+JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_submitRead
+  (JNIEnv * env, jclass clazz, jint fileHandle, jobject contextPointer, jlong position, jint size, jobject bufferRead, jobject callback) {
+    struct io_control * theControl = getIOControl(env, contextPointer);
+    if (theControl == NULL) {
+      return;
+    }
+
+    struct iocb * iocb = getIOCB(theControl);
+
+    if (iocb == NULL) {
+        throwIOException(env, "Not enough space in libaio queue");
+        return;
+    }
+
+    io_prep_pread(iocb, fileHandle, getBuffer(env, bufferRead), (size_t)size, position);
+
+    // The GlobalRef will be deleted when poll is called. this is done so
+    // the vm wouldn't crash if the Callback passed by the user is GCed between submission
+    // and callback.
+    // also as the real intention is to hold the reference until the life cycle is complete
+    iocb->data = (void *) (*env)->NewGlobalRef(env, callback);
+
+    return submit(env, theControl, iocb);
+}
+
+JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_blockedPoll
+  (JNIEnv * env, jobject thisObject, jobject contextPointer) {
+
+    #ifdef DEBUG
+       fprintf (stdout, "Running blockedPoll");
+    #endif
+
+    int i;
+    struct io_control * theControl = getIOControl(env, contextPointer);
+    if (theControl == NULL) {
+      return;
+    }
+    int max = theControl->queueSize;
+    pthread_mutex_lock(&(theControl->pollLock));
+
+    for (;;) {
+
+        int result = io_getevents(theControl->ioContext, 1, max, theControl->events, 0);
+
+        if (result < 0)
+        {
+            #ifdef DEBUG
+               fprintf (stdout, "finished blockedPoll rutine with result=%d\n", result);
+            #endif
+            break;
+        }
+        #ifdef DEBUG
+           fprintf (stdout, "blockedPoll returned %d events\n", result);
+        #endif
+
+        for (i = 0; i < result; i++)
+        {
+            #ifdef DEBUG
+               fprintf (stdout, "blockedPoll treading event %d\n", i);
+            #endif
+            struct io_event * event = &(theControl->events[i]);
+            struct iocb * iocbp = event->obj;
+            int eventResult = (int)event->res;
+
+            #ifdef DEBUG
+                fprintf (stdout, "Poll res: %d totalRes=%d\n", eventResult, result);
+            #endif
+
+            if (eventResult < 0) {
+                #ifdef DEBUG
+                    fprintf (stdout, "Error: %s\n", strerror(-eventResult));
+                #endif
+
+                jstring jstrError = (*env)->NewStringUTF(env, strerror(-eventResult));
+
+                if (iocbp->data != NULL) {
+                    (*env)->CallVoidMethod(env, (jobject)(iocbp->data), errorMethod, (jint)(-eventResult), jstrError);
+                }
+            }
+
+            jobject obj = (jobject)iocbp->data;
+            putIOCB(theControl, iocbp);
+
+            if (obj != NULL) {
+                (*env)->CallVoidMethod(env, theControl->thisObject, libaioContextDone,obj);
+                // We delete the globalRef after the completion of the callback
+                (*env)->DeleteGlobalRef(env, obj);
+            }
+
+        }
+    }
+
+    pthread_mutex_unlock(&(theControl->pollLock));
+
+}
+
+JNIEXPORT jint JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_poll
+  (JNIEnv * env, jobject obj, jobject contextPointer, jobjectArray callbacks, jint min, jint max) {
+    int i = 0;
+    struct io_control * theControl = getIOControl(env, contextPointer);
+    if (theControl == NULL) {
+      return 0;
+    }
+
+
+    int result = io_getevents(theControl->ioContext, min, max, theControl->events, 0);
+    int retVal = result;
+
+    for (i = 0; i < result; i++) {
+        struct io_event * event = &(theControl->events[i]);
+        struct iocb * iocbp = event->obj;
+        int eventResult = (int)event->res;
+
+        #ifdef DEBUG
+            fprintf (stdout, "Poll res: %d totalRes=%d\n", eventResult, result);
+        #endif
+
+        if (eventResult < 0) {
+            #ifdef DEBUG
+                fprintf (stdout, "Error: %s\n", strerror(-eventResult));
+            #endif
+
+            jstring jstrError = (*env)->NewStringUTF(env, strerror(-eventResult));
+
+            (*env)->CallVoidMethod(env, (jobject)(iocbp->data), errorMethod, (jint)(-eventResult), jstrError);
+        }
+
+        (*env)->SetObjectArrayElement(env, callbacks, i, (jobject)iocbp->data);
+
+        if (iocbp->data != NULL) {
+            // We delete the globalRef after the completion of the callback
+            (*env)->DeleteGlobalRef(env, (jobject)iocbp->data);
+        }
+
+        putIOCB(theControl, iocbp);
+    }
+
+    return retVal;
+}
+
+JNIEXPORT jobject JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_newAlignedBuffer
+(JNIEnv * env, jclass clazz, jint size, jint alignment) {
+    if (size % alignment != 0) {
+        throwRuntimeException(env, "Buffer size needs to be aligned to passed argument");
+        return NULL;
+    }
+
+    // This will allocate a buffer, aligned by alignment.
+    // Buffers created here need to be manually destroyed by destroyBuffer, or this would leak on the process heap away of Java's GC managed memory
+    // NOTE: this buffer will contain non initialized data, you must fill it up properly
+    void * buffer;
+    int result = posix_memalign(&buffer, (size_t)alignment, (size_t)size);
+
+    if (result) {
+        throwRuntimeExceptionErrorNo(env, "Can't allocate posix buffer:", result);
+        return NULL;
+    }
+
+    memset(buffer, 0, (size_t)size);
+
+    return (*env)->NewDirectByteBuffer(env, buffer, size);
+}
+
+JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_freeBuffer
+  (JNIEnv * env, jclass clazz, jobject jbuffer) {
+    if (jbuffer == NULL)
+    {
+       throwRuntimeException(env, "Null pointer");
+       return;
+    }
+  	void *  buffer = (*env)->GetDirectBufferAddress(env, jbuffer);
+  	free(buffer);
+}
+
+
+/** It does nothing... just return true to make sure it has all the binary dependencies */
+JNIEXPORT jint JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_getNativeVersion
+  (JNIEnv * env, jclass clazz)
+
+{
+     return org_apache_activemq_artemis_jlibaio_LibaioContext_EXPECTED_NATIVE_VERSION;
+}
+
+JNIEXPORT jlong JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_getSize
+  (JNIEnv * env, jclass clazz, jint fd)
+{
+	struct stat statBuffer;
+
+	if (fstat(fd, &statBuffer) < 0)
+	{
+	    throwIOExceptionErrorNo(env, "Cannot determine file size:", errno);
+		return -1l;
+	}
+	return statBuffer.st_size;
+}
+
+JNIEXPORT jint JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_getBlockSizeFD
+  (JNIEnv * env, jclass clazz, jint fd)
+{
+	struct stat statBuffer;
+
+	if (fstat(fd, &statBuffer) < 0)
+	{
+	    throwIOExceptionErrorNo(env, "Cannot determine file size:", errno);
+		return -1l;
+	}
+	return statBuffer.st_blksize;
+}
+
+JNIEXPORT jint JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_getBlockSize
+  (JNIEnv * env, jclass clazz, jstring path)
+{
+    const char* f_path = (*env)->GetStringUTFChars(env, path, 0);
+	struct stat statBuffer;
+
+	if (stat(f_path, &statBuffer) < 0)
+	{
+	    throwIOExceptionErrorNo(env, "Cannot determine file size:", errno);
+		return -1l;
+	}
+
+    (*env)->ReleaseStringUTFChars(env, path, f_path);
+
+	return statBuffer.st_blksize;
+}
+
+JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_fallocate
+  (JNIEnv * env, jclass clazz, jint fd, jlong size)
+{
+    if (fallocate(fd, 0, 0, (off_t) size) < 0)
+    {
+        throwIOExceptionErrorNo(env, "Could not preallocate file", errno);
+    }
+    fsync(fd);
+    lseek (fd, 0, SEEK_SET);
+}
+
+JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_fill
+  (JNIEnv * env, jclass clazz, jint fd, jlong size)
+{
+	void * preAllocBuffer = 0;
+	if (posix_memalign(&preAllocBuffer, 512, size) != 0)
+	{
+	      throwOutOfMemoryError(env);
+	      return;
+	}
+	memset(preAllocBuffer, 0, size);
+    lseek (fd, 0, SEEK_SET);
+    write(fd, preAllocBuffer, size);
+	lseek (fd, 0, SEEK_SET);
+	free (preAllocBuffer);
+}
+
+JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_memsetBuffer
+  (JNIEnv *env, jclass clazz, jobject jbuffer, jint size)
+{
+    #ifdef DEBUG
+        fprintf (stdout, "Mem setting buffer with %d bytes\n", size);
+    #endif
+    void * buffer = (*env)->GetDirectBufferAddress(env, jbuffer);
+
+    if (buffer == 0)
+    {
+        throwRuntimeException(env, "Invalid Buffer used, libaio requires NativeBuffer instead of Java ByteBuffer");
+        return;
+    }
+
+    memset(buffer, 0, (size_t)size);
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/java/org/apache/activemq/artemis/core/libaio/Native.java
----------------------------------------------------------------------
diff --git a/artemis-native/src/main/java/org/apache/activemq/artemis/core/libaio/Native.java b/artemis-native/src/main/java/org/apache/activemq/artemis/core/libaio/Native.java
deleted file mode 100644
index 51f5da7..0000000
--- a/artemis-native/src/main/java/org/apache/activemq/artemis/core/libaio/Native.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.libaio;
-
-import java.nio.ByteBuffer;
-
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-
-public class Native
-{
-   // Functions used for locking files .....
-   public static native int openFile(String fileName);
-
-   public static native void closeFile(int handle);
-
-   public static native boolean flock(int handle);
-   // Functions used for locking files ^^^^^^^^
-
-   public static native void resetBuffer(ByteBuffer directByteBuffer, int size);
-
-   public static native void destroyBuffer(ByteBuffer buffer);
-
-   public static native ByteBuffer newNativeBuffer(long size);
-
-   public static native void newInit(Class someClass);
-
-   public static native ByteBuffer init(Class controllerClass, String fileName, int maxIO, Object logger) throws ActiveMQException;
-
-   public static native long size0(ByteBuffer handle);
-
-   public static native void write(Object thisObject, ByteBuffer handle,
-                             long sequence,
-                             long position,
-                             long size,
-                             ByteBuffer buffer,
-                             Object aioPackageCallback) throws ActiveMQException;
-
-   /** a direct write to the file without the use of libaio's submit. */
-   public static native void writeInternal(ByteBuffer handle, long positionToWrite, long size, ByteBuffer bytes) throws ActiveMQException;
-
-   /**
-    *This is using org.apache.activemq.artemis.core.asyncio.AIOCallback
-     */
-   public static native void read(Object thisObject, ByteBuffer handle, long position, long size, ByteBuffer buffer, Object aioPackageCallback) throws ActiveMQException;
-
-   public static native void fill(ByteBuffer handle, long position, int blocks, long size, byte fillChar) throws ActiveMQException;
-
-   public static native void closeInternal(ByteBuffer handler);
-
-   public static native void stopPoller(ByteBuffer handler);
-
-   /** A native method that does nothing, and just validate if the ELF dependencies are loaded and on the correct platform as this binary format */
-   public static native int getNativeVersion();
-
-   /** Poll asynchronous events from internal queues */
-   public static native void internalPollEvents(ByteBuffer handler);
-
-   // Inner classes ---------------------------------------------------------------------
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java
----------------------------------------------------------------------
diff --git a/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java
new file mode 100644
index 0000000..8b45f54
--- /dev/null
+++ b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.jlibaio;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static io.netty.util.internal.ObjectUtil.checkNotNull;
+
+/**
+ * This class is used as an aggregator for the {@link LibaioFile}.
+ * <br>
+ * It holds native data, and it will share a libaio queue that can be used by multiple files.
+ * <br>
+ * You need to use the poll methods to read the result of write and read submissions.
+ * <br>
+ * You also need to use the special buffer created by {@link LibaioFile} as you need special alignments
+ * when dealing with O_DIRECT files.
+ * <br>
+ * A Single controller can server multiple files. There's no need to create one controller per file.
+ * <br>
+ * <a href="https://ext4.wiki.kernel.org/index.php/Clarifying_Direct_IO's_Semantics">Interesting reading for this.</a>
+ */
+public class LibaioContext <Callback extends SubmitInfo> implements Closeable
+{
+
+   private static final AtomicLong totalMaxIO = new AtomicLong(0);
+
+   /**
+    * This definition needs to match Version.h on the native sources.
+    * <br>
+    * Or else the native module won't be loaded because of version mismatches
+    */
+   private static final int EXPECTED_NATIVE_VERSION = 1;
+
+   private static boolean loaded = false;
+
+   public static boolean isLoaded()
+   {
+      return loaded;
+   }
+
+   private static boolean loadLibrary(final String name)
+   {
+      try
+      {
+         System.loadLibrary(name);
+         if (getNativeVersion() != EXPECTED_NATIVE_VERSION)
+         {
+            NativeLogger.LOGGER.incompatibleNativeLibrary();
+            return false;
+         }
+         else
+         {
+            return true;
+         }
+      }
+      catch (Throwable e)
+      {
+         NativeLogger.LOGGER.debug(name + " -> error loading the native library", e);
+         return false;
+      }
+
+   }
+
+   static
+   {
+      String[] libraries = new String[]{"artemis-native-64", "artemis-native-32"};
+
+      for (String library : libraries)
+      {
+         if (loadLibrary(library))
+         {
+            loaded = true;
+            break;
+         }
+         else
+         {
+            NativeLogger.LOGGER.debug("Library " + library + " not found!");
+         }
+      }
+
+      if (!loaded)
+      {
+         NativeLogger.LOGGER.debug("Couldn't locate LibAIO Wrapper");
+      }
+   }
+
+   /**
+    * This is used to validate leaks on tests.
+    * @return the number of allocated aio, to be used on test checks.
+    */
+   public static long getTotalMaxIO()
+   {
+      return totalMaxIO.get();
+   }
+
+   /**
+    * It will reset all the positions on the buffer to 0, using memset.
+    * @param buffer a native buffer.
+s    */
+   public void memsetBuffer(ByteBuffer buffer)
+   {
+      memsetBuffer(buffer, buffer.limit());
+   }
+
+   /**
+    * This is used on tests validating for leaks.
+    */
+   public static void resetMaxAIO()
+   {
+      totalMaxIO.set(0);
+   }
+
+   /**
+    * the native ioContext including the structure created.
+    */
+   private final ByteBuffer ioContext;
+
+   private final AtomicBoolean closed = new AtomicBoolean(false);
+
+   final Semaphore ioSpace;
+
+   final int queueSize;
+
+   /**
+    * The queue size here will use resources defined on the kernel parameter
+    * <a href="https://www.kernel.org/doc/Documentation/sysctl/fs.txt">fs.aio-max-nr</a> .
+    *
+    * @param queueSize the size to be initialize on libaio
+    *                  io_queue_init which can't be higher than /proc/sys/fs/aio-max-nr.
+    * @param useSemaphore should block on a semaphore avoiding using more submits than what's available.
+    */
+   public LibaioContext(int queueSize, boolean useSemaphore)
+   {
+      try
+      {
+         this.ioContext = newContext(queueSize);
+      }
+      catch (Exception e)
+      {
+         throw e;
+      }
+      this.queueSize = queueSize;
+      totalMaxIO.addAndGet(queueSize);
+      if (useSemaphore)
+      {
+         this.ioSpace = new Semaphore(queueSize);
+      }
+      else
+      {
+         this.ioSpace = null;
+      }
+   }
+
+
+   /**
+    * Documented at {@link LibaioFile#write(long, int, java.nio.ByteBuffer, SubmitInfo)}
+    * @param fd the file descriptor
+    * @param position the write position
+    * @param size number of bytes to use
+    * @param bufferWrite the native buffer
+    * @param callback a callback
+    * @throws IOException in case of error
+    */
+   public void submitWrite(int fd,long position, int size,
+                              ByteBuffer bufferWrite, Callback callback) throws IOException
+   {
+      try
+      {
+         if (ioSpace != null)
+         {
+            ioSpace.acquire();
+         }
+      }
+      catch (InterruptedException e)
+      {
+         Thread.currentThread().interrupt();
+         throw new IOException(e.getMessage(), e);
+      }
+      submitWrite(fd, this.ioContext, position, size, bufferWrite, callback);
+   }
+
+   public void submitRead(int fd, long position, int size, ByteBuffer bufferWrite,
+                          Callback callback) throws IOException
+   {
+      try
+      {
+         if (ioSpace != null)
+         {
+            ioSpace.acquire();
+         }
+      }
+      catch (InterruptedException e)
+      {
+         Thread.currentThread().interrupt();
+         throw new IOException(e.getMessage(), e);
+      }
+      submitRead(fd, this.ioContext, position, size, bufferWrite, callback);
+   }
+
+
+   /**
+    * This is used to close the libaio queues and cleanup the native data used.
+    * <br>
+    * It is unsafe to close the controller while you have pending writes or files open as
+    * this could cause core dumps or VM crashes.
+    */
+   @Override
+   public void close()
+   {
+      if (!closed.getAndSet(true))
+      {
+         totalMaxIO.addAndGet(-queueSize);
+
+         if (ioContext != null)
+         {
+            deleteContext(ioContext);
+         }
+      }
+   }
+
+   @Override
+   protected void finalize() throws Throwable
+   {
+      super.finalize();
+      close();
+   }
+
+   /**
+    * It will open a file. If you set the direct flag = false then you won't need to use the special buffer.
+    * Notice: This will create an empty file if the file doesn't already exist.
+    *
+    * @param file the file to be open.
+    * @param direct will set ODIRECT.
+    * @return It will return a LibaioFile instance.
+    * @throws IOException in case of error.
+    */
+   public LibaioFile<Callback> openFile(File file, boolean direct) throws IOException
+   {
+      return openFile(file.getPath(), direct);
+   }
+
+   /**
+    * It will open a file. If you set the direct flag = false then you won't need to use the special buffer.
+    * Notice: This will create an empty file if the file doesn't already exist.
+    *
+    * @param file the file to be open.
+    * @param direct should use O_DIRECT when opening the file.
+    * @return a new open file.
+    * @throws IOException in case of error.
+    */
+   public LibaioFile<Callback> openFile(String file, boolean direct) throws IOException
+   {
+      checkNotNull(file, "path");
+      checkNotNull(ioContext, "IOContext");
+
+      // note: the native layer will throw an IOException in case of errors
+      int res = LibaioContext.open(file, direct);
+
+      return new LibaioFile<>(res, this);
+   }
+
+   /**
+    * It will open a file disassociated with any sort of factory.
+    * This is useful when you won't use reading / writing through libaio like locking files.
+    * @param file a file name
+    * @param direct will use O_DIRECT
+    * @return a new file
+    * @throws IOException in case of error.
+    */
+   public static LibaioFile openControlFile(String file, boolean direct) throws IOException
+   {
+      checkNotNull(file, "path");
+
+      // note: the native layer will throw an IOException in case of errors
+      int res = LibaioContext.open(file, direct);
+
+      return new LibaioFile<>(res, null);
+   }
+
+   /**
+    * It will poll the libaio queue for results. It should block until min is reached
+    * Results are placed on the callback.
+    * <br>
+    * This shouldn't be called concurrently. You should provide your own synchronization if you need more than one
+    * Thread polling for any reason.
+    * <br>
+    * Notice that the native layer will invoke {@link SubmitInfo#onError(int, String)} in case of failures,
+    *     but it won't call done method for you.
+    *
+    * @param callbacks area to receive the callbacks passed on submission.The size of this callback has to
+    *                  be greater than the parameter max.
+    * @param min       the minimum number of elements to receive. It will block until this is achieved.
+    * @param max       The maximum number of elements to receive.
+    * @return Number of callbacks returned.
+    * @see LibaioFile#write(long, int, java.nio.ByteBuffer, SubmitInfo)
+    * @see LibaioFile#read(long, int, java.nio.ByteBuffer, SubmitInfo)
+    */
+   public int poll(Callback[] callbacks, int min, int max)
+   {
+      int released = poll(ioContext, callbacks, min, max);
+      if (ioSpace != null)
+      {
+         if (released > 0)
+         {
+            ioSpace.release(released);
+         }
+      }
+      return  released;
+   }
+
+   /**
+    * It will start polling and will keep doing until the context is closed.
+    * This will call callbacks on {@link SubmitInfo#onError(int, String)} and
+    *  {@link SubmitInfo#done()}.
+    * In case of error, both {@link SubmitInfo#onError(int, String)} and
+    *   {@link SubmitInfo#done()} are called.
+    */
+   public void poll()
+   {
+      blockedPoll(ioContext);
+   }
+
+   /** Called from the native layer */
+   private void done(SubmitInfo info)
+   {
+      if (ioSpace != null)
+      {
+         ioSpace.release();
+      }
+      info.done();
+   }
+
+   /**
+    * This is the queue for libaio, initialized with queueSize.
+    */
+   private native ByteBuffer newContext(int queueSize);
+
+   /**
+    * Internal method to be used when closing the controller.
+    */
+   private native void deleteContext(ByteBuffer buffer);
+
+   /**
+    * it will return a file descriptor.
+    *
+    * @param path the file name.
+    * @param direct translates as O_DIRECT On open
+    * @return a fd from open C call.
+    */
+   public static native int open(String path, boolean direct);
+
+   static native void close(int fd);
+
+   /**
+    */
+
+   /**
+    * Buffers for O_DIRECT need to use posix_memalign.
+    * <br>
+    * Documented at {@link LibaioFile#newBuffer(int)}.
+    *
+    * @param size needs to be % alignment
+    * @param alignment the alignment used at the dispositive
+    * @return a new native buffer used with posix_memalign
+    */
+   public static native ByteBuffer newAlignedBuffer(int size, int alignment);
+
+   /**
+    * This will call posix free to release the inner buffer allocated at {@link #newAlignedBuffer(int, int)}.
+    * @param buffer a native buffer allocated with {@link #newAlignedBuffer(int, int)}.
+    */
+   public static native void freeBuffer(ByteBuffer buffer);
+
+   /**
+    * Documented at {@link LibaioFile#write(long, int, java.nio.ByteBuffer, SubmitInfo)}.
+    */
+   native void submitWrite(int fd,
+                              ByteBuffer libaioContext,
+                              long position, int size, ByteBuffer bufferWrite,
+                              Callback callback) throws IOException;
+
+   /**
+    * Documented at {@link LibaioFile#read(long, int, java.nio.ByteBuffer, SubmitInfo)}.
+    */
+   native void submitRead(int fd,
+                             ByteBuffer libaioContext,
+                             long position, int size, ByteBuffer bufferWrite,
+                             Callback callback) throws IOException;
+
+   /**
+    * Note: this shouldn't be done concurrently.
+    * This method will block until the min condition is satisfied on the poll.
+    * <p/>
+    * The callbacks will include the original callback sent at submit (read or write).
+    */
+   native int poll(ByteBuffer libaioContext, Callback[] callbacks, int min, int max);
+
+   /**
+    * This method will block as long as the context is open.
+    */
+   native void blockedPoll(ByteBuffer libaioContext);
+
+   static native int getNativeVersion();
+
+   public static native boolean lock(int fd);
+
+   public static native void memsetBuffer(ByteBuffer buffer, int size);
+
+   static native long getSize(int fd);
+
+   static native int getBlockSizeFD(int fd);
+
+   public static int getBlockSize(File path)
+   {
+      return getBlockSize(path.getAbsolutePath());
+   }
+
+   public static native int getBlockSize(String path);
+
+   static native void fallocate(int fd, long size);
+
+   static native void fill(int fd, long size);
+
+   static native void writeInternal(int fd, long position, long size, ByteBuffer bufferWrite) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioFile.java
----------------------------------------------------------------------
diff --git a/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioFile.java b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioFile.java
new file mode 100644
index 0000000..bf88d65
--- /dev/null
+++ b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioFile.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.jlibaio;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * This is an extension to use libaio.
+ */
+public final class LibaioFile<Callback extends SubmitInfo>
+{
+   protected boolean open;
+   /**
+    * This represents a structure allocated on the native
+    * this is a io_context_t
+    */
+   final LibaioContext<Callback> ctx;
+
+   private int fd;
+
+   LibaioFile(int fd, LibaioContext ctx)
+   {
+      this.ctx = ctx;
+      this.fd = fd;
+   }
+
+   public int getBlockSize()
+   {
+      return 512;
+      // FIXME
+      //return LibaioContext.getBlockSizeFD(fd);
+   }
+
+   public boolean lock()
+   {
+      return LibaioContext.lock(fd);
+   }
+
+   /**
+    * {@inheritDoc}
+    */
+   public void close() throws IOException
+   {
+      open = false;
+      LibaioContext.close(fd);
+   }
+
+   /**
+    * @return The size of the file.
+    */
+   public long getSize()
+   {
+      return LibaioContext.getSize(fd);
+   }
+
+   /**
+    * It will submit a write to the queue. The callback sent here will be received on the
+    * {@link LibaioContext#poll(SubmitInfo[], int, int)}
+    * In case of the libaio queue is full (e.g. returning E_AGAIN) this method will return false.
+    * <br>
+    * Notice: this won't hold a global reference on buffer, callback should hold a reference towards bufferWrite.
+    * And don't free the buffer until the callback was called as this could crash the VM.
+    *
+    * @param position The position on the file to write. Notice this has to be a multiple of 512.
+    * @param size     The size of the buffer to use while writing.
+    * @param buffer   if you are using O_DIRECT the buffer here needs to be allocated by {@link #newBuffer(int)}.
+    * @param callback A callback to be returned on the poll method.
+    * @throws java.io.IOException in case of error
+    */
+   public void write(long position, int size, ByteBuffer buffer, Callback callback) throws IOException
+   {
+      ctx.submitWrite(fd, position, size, buffer, callback);
+   }
+
+   /**
+    * It will submit a read to the queue. The callback sent here will be received on the
+    * {@link LibaioContext#poll(SubmitInfo[], int, int)}.
+    * In case of the libaio queue is full (e.g. returning E_AGAIN) this method will return false.
+    * <br>
+    * Notice: this won't hold a global reference on buffer, callback should hold a reference towards bufferWrite.
+    * And don't free the buffer until the callback was called as this could crash the VM.
+    * *
+    *
+    * @param position The position on the file to read. Notice this has to be a multiple of 512.
+    * @param size     The size of the buffer to use while reading.
+    * @param buffer   if you are using O_DIRECT the buffer here needs to be allocated by {@link #newBuffer(int)}.
+    * @param callback A callback to be returned on the poll method.
+    * @throws java.io.IOException in case of error
+    * @see LibaioContext#poll(SubmitInfo[], int, int)
+    */
+   public void read(long position, int size, ByteBuffer buffer, Callback callback) throws IOException
+   {
+      ctx.submitRead(fd, position, size, buffer, callback);
+   }
+
+   /**
+    * It will allocate a buffer to be used on libaio operations.
+    * Buffers here are allocated with posix_memalign.
+    * <br>
+    * You need to explicitly free the buffer created from here using the
+    * {@link LibaioContext#freeBuffer(java.nio.ByteBuffer)}.
+    *
+    * @param size the size of the buffer.
+    * @return the buffer allocated.
+    */
+   public ByteBuffer newBuffer(int size)
+   {
+      return LibaioContext.newAlignedBuffer(size, 512);
+   }
+
+   /**
+    * It will preallocate the file with a given size.
+    * @param size number of bytes to be filled on the file
+    */
+   public void fill(long size)
+   {
+      try
+      {
+         LibaioContext.fill(fd, size);
+      }
+      catch (OutOfMemoryError e)
+      {
+         NativeLogger.LOGGER.debug("Didn't have enough memory to allocate " + size + " bytes in memory, using simple fallocate");
+         LibaioContext.fallocate(fd, size);
+      }
+   }
+
+   /**
+    * It will use fallocate to initialize a file.
+    * @param size number of bytes to be filled on the file
+    */
+   public void fallocate(long size)
+   {
+      LibaioContext.fallocate(fd, size);
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/NativeLogger.java
----------------------------------------------------------------------
diff --git a/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/NativeLogger.java b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/NativeLogger.java
new file mode 100644
index 0000000..449c168
--- /dev/null
+++ b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/NativeLogger.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.jlibaio;
+
+import org.jboss.logging.BasicLogger;
+import org.jboss.logging.Logger;
+import org.jboss.logging.annotations.LogMessage;
+import org.jboss.logging.annotations.Message;
+import org.jboss.logging.annotations.MessageLogger;
+
+/**
+ * Logger Code 14
+ *
+ * each message id must be 6 digits long starting with 14, the 3rd digit donates the level so
+ *
+ * INF0  1
+ * WARN  2
+ * DEBUG 3
+ * ERROR 4
+ * TRACE 5
+ * FATAL 6
+ *
+ * so an INFO message would be 1000 to 6000
+ */
+@MessageLogger(projectCode = "jlibaio")
+public interface NativeLogger extends BasicLogger
+{
+   /**
+    * The journal logger.
+    */
+   NativeLogger LOGGER = Logger.getMessageLogger(NativeLogger.class, NativeLogger.class.getPackage().getName());
+
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 1001, value = "You have a native library with a different version than expected", format = Message.Format.MESSAGE_FORMAT)
+   void incompatibleNativeLibrary();
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/SubmitInfo.java
----------------------------------------------------------------------
diff --git a/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/SubmitInfo.java b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/SubmitInfo.java
new file mode 100644
index 0000000..47feea8
--- /dev/null
+++ b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/SubmitInfo.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.jlibaio;
+
+public interface SubmitInfo
+{
+   void onError(int errno, String message);
+
+   void done();
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/package-info.java
----------------------------------------------------------------------
diff --git a/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/package-info.java b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/package-info.java
new file mode 100644
index 0000000..341fa1c
--- /dev/null
+++ b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This packages handles Linux libaio at a low level.
+ * <br>
+ * Buffers needs to be specially allocated by {@link org.apache.activemq.artemis.jlibaio.LibaioContext#newAlignedBuffer(int, int)}
+ * as they need to be aligned to 512 or 4096 when using Direct files.
+ */
+package org.apache.activemq.artemis.jlibaio;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/util/CallbackCache.java
----------------------------------------------------------------------
diff --git a/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/util/CallbackCache.java b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/util/CallbackCache.java
new file mode 100644
index 0000000..ec2a630
--- /dev/null
+++ b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/util/CallbackCache.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.jlibaio.util;
+
+
+import org.apache.activemq.artemis.jlibaio.SubmitInfo;
+
+/**
+ * this is an utility class where you can reuse Callbackk objects for your LibaioContext usage.
+ */
+public class CallbackCache<Callback extends SubmitInfo>
+{
+   private final SubmitInfo[] pool;
+
+   private int put = 0;
+   private int get = 0;
+   private int available = 0;
+   private final int size;
+
+   private final Object lock = new Object();
+
+   public CallbackCache(int size)
+   {
+      this.pool = new SubmitInfo[size];
+      this.size = size;
+   }
+
+   public Callback get()
+   {
+      synchronized (lock)
+      {
+         if (available <= 0)
+         {
+            return null;
+         }
+         else
+         {
+            Callback retValue = (Callback)pool[get];
+            pool[get] = null;
+            if (retValue == null)
+            {
+               throw new NullPointerException("You should initialize the pool before using it");
+            }
+            if (retValue != null)
+            {
+               available--;
+               get++;
+               if (get >= size)
+               {
+                  get = 0;
+               }
+            }
+            return retValue;
+         }
+      }
+   }
+
+   public CallbackCache put(Callback callback)
+   {
+      if (callback == null)
+      {
+         return null;
+      }
+      synchronized (lock)
+      {
+         if (available < size)
+         {
+            available++;
+            pool[put++] = callback;
+            if (put >= size)
+            {
+               put = 0;
+            }
+         }
+      }
+      return this;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/CallbackCachelTest.java
----------------------------------------------------------------------
diff --git a/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/CallbackCachelTest.java b/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/CallbackCachelTest.java
new file mode 100644
index 0000000..f62f8e2
--- /dev/null
+++ b/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/CallbackCachelTest.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.jlibaio.test;
+
+import java.util.HashSet;
+
+import org.apache.activemq.artemis.jlibaio.util.CallbackCache;
+import org.apache.activemq.artemis.jlibaio.SubmitInfo;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CallbackCachelTest
+{
+   @Test
+   public void testPartiallyInitialized()
+   {
+      CallbackCache<MyPool> pool = new CallbackCache(100);
+
+
+      for (int i = 0; i < 50; i++)
+      {
+         pool.put(new MyPool(i));
+      }
+
+      MyPool value = pool.get();
+
+      Assert.assertNotNull(value);
+
+      pool.put(value);
+
+
+      // add and remove immediately
+      for (int i = 0; i < 777; i++)
+      {
+         pool.put(pool.get());
+      }
+
+
+      HashSet<MyPool> hashValues = new HashSet<>();
+
+
+      MyPool getValue;
+      while ((getValue = pool.get()) != null)
+      {
+         hashValues.add(getValue);
+      }
+
+
+      Assert.assertEquals(50, hashValues.size());
+   }
+
+   static class MyPool implements SubmitInfo
+   {
+      public final int i;
+
+      MyPool(int i)
+      {
+         this.i = i;
+      }
+
+
+      public int getI()
+      {
+         return i;
+      }
+
+      @Override
+      public void onError(int errno, String message)
+      {
+      }
+
+      @Override
+      public void done()
+      {
+
+      }
+
+      @Override
+      public boolean equals(Object o)
+      {
+         if (this == o) return true;
+         if (o == null || getClass() != o.getClass()) return false;
+
+         MyPool myPool = (MyPool) o;
+
+         if (i != myPool.i) return false;
+
+         return true;
+      }
+
+      @Override
+      public int hashCode()
+      {
+         return i;
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java
----------------------------------------------------------------------
diff --git a/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java b/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java
new file mode 100644
index 0000000..e46caf3
--- /dev/null
+++ b/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java
@@ -0,0 +1,859 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.jlibaio.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.jlibaio.LibaioContext;
+import org.apache.activemq.artemis.jlibaio.LibaioFile;
+import org.apache.activemq.artemis.jlibaio.SubmitInfo;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * This test is using a different package from {@link LibaioFile}
+ * as I need to validate public methods on the API
+ */
+public class LibaioTest
+{
+
+   @BeforeClass
+   public static void testAIO()
+   {
+      Assume.assumeTrue(LibaioContext.isLoaded());
+   }
+
+   /** This is just an arbitrary number for a number of elements you need to pass to the libaio init method
+    * Some of the tests are using half of this number, so if anyone decide to change this please use an even number.
+    */
+   private static final int LIBAIO_QUEUE_SIZE = 50;
+
+   @Rule
+   public TemporaryFolder temporaryFolder;
+
+   public LibaioContext<TestInfo> control;
+
+   @Before
+   public void setUpFactory()
+   {
+      control = new LibaioContext<>(LIBAIO_QUEUE_SIZE, true);
+   }
+
+   @After
+   public void deleteFactory()
+   {
+      control.close();
+      validateLibaio();
+   }
+
+   public void validateLibaio()
+   {
+      Assert.assertEquals(0, LibaioContext.getTotalMaxIO());
+   }
+
+   public LibaioTest()
+   {
+        /*
+         *  I didn't use /tmp for three reasons
+         *  - Most systems now will use tmpfs which is not compatible with O_DIRECT
+         *  - This would fill up /tmp in case of failures.
+         *  - target is cleaned up every time you do a mvn clean, so it's safer
+         */
+      File parent = new File("./target");
+      parent.mkdirs();
+      temporaryFolder = new TemporaryFolder(parent);
+   }
+
+   @Test
+   public void testOpen() throws Exception
+   {
+      LibaioFile fileDescriptor = control.openFile(temporaryFolder.newFile("test.bin"), true);
+      fileDescriptor.close();
+   }
+
+   @Test
+   public void testInitAndFallocate() throws Exception
+   {
+      LibaioFile fileDescriptor = control.openFile(temporaryFolder.newFile("test.bin"), true);
+      fileDescriptor.fallocate(1024 * 1024);
+
+      ByteBuffer buffer = fileDescriptor.newBuffer(1024 * 1024);
+      fileDescriptor.read(0, 1024 * 1024, buffer, new TestInfo());
+
+      TestInfo[] callbacks = new TestInfo[1];
+      control.poll(callbacks, 1, 1);
+
+      fileDescriptor.close();
+
+
+      buffer.position(0);
+
+      LibaioFile fileDescriptor2 = control.openFile(temporaryFolder.newFile("test2.bin"), true);
+      fileDescriptor2.fill(1024 * 1024);
+      fileDescriptor2.read(0, 1024 * 1024, buffer, new TestInfo());
+
+      control.poll(callbacks, 1, 1);
+      for (int i = 0; i < 1024 * 1024; i++)
+      {
+         Assert.assertEquals(0, buffer.get());
+      }
+
+      LibaioContext.freeBuffer(buffer);
+   }
+
+   @Test
+   public void testSubmitWriteOnTwoFiles() throws Exception
+   {
+
+      File file1 = temporaryFolder.newFile("test.bin");
+      File file2 = temporaryFolder.newFile("test2.bin");
+
+      fillupFile(file1, LIBAIO_QUEUE_SIZE / 2);
+      fillupFile(file2, LIBAIO_QUEUE_SIZE / 2);
+
+      LibaioFile[] fileDescriptor = new LibaioFile[]{control.openFile(file1, true),
+         control.openFile(file2, true)};
+
+      Assert.assertEquals((LIBAIO_QUEUE_SIZE / 2) * 512, fileDescriptor[0].getSize());
+      Assert.assertEquals((LIBAIO_QUEUE_SIZE / 2) * 512, fileDescriptor[1].getSize());
+      Assert.assertEquals(fileDescriptor[0].getBlockSize(), fileDescriptor[1].getBlockSize());
+      Assert.assertEquals(LibaioContext.getBlockSize(temporaryFolder.getRoot()), LibaioContext.getBlockSize(file1));
+      Assert.assertEquals(LibaioContext.getBlockSize(file1), LibaioContext.getBlockSize(file2));
+      System.out.println("blockSize = " + fileDescriptor[0].getBlockSize());
+      System.out.println("blockSize /tmp= " + LibaioContext.getBlockSize("/tmp"));
+
+      ByteBuffer buffer = LibaioContext.newAlignedBuffer(512, 512);
+
+      try
+      {
+         for (int i = 0; i < 512; i++)
+         {
+            buffer.put((byte) 'a');
+         }
+
+         TestInfo callback = new TestInfo();
+         TestInfo[] callbacks = new TestInfo[LIBAIO_QUEUE_SIZE];
+
+         for (int i = 0; i < LIBAIO_QUEUE_SIZE / 2; i++)
+         {
+            for (LibaioFile file : fileDescriptor)
+            {
+               file.write(i * 512, 512, buffer, callback);
+            }
+         }
+
+         Assert.assertEquals(LIBAIO_QUEUE_SIZE, control.poll(callbacks, LIBAIO_QUEUE_SIZE, LIBAIO_QUEUE_SIZE));
+
+         for (Object returnedCallback : callbacks)
+         {
+            Assert.assertSame(returnedCallback, callback);
+         }
+
+         for (LibaioFile file : fileDescriptor)
+         {
+            ByteBuffer bigbuffer = LibaioContext.newAlignedBuffer(512 * 25, 512);
+            file.read(0, 512 * 25, bigbuffer, callback);
+            Assert.assertEquals(1, control.poll(callbacks, 1, LIBAIO_QUEUE_SIZE));
+
+            for (Object returnedCallback : callbacks)
+            {
+               Assert.assertSame(returnedCallback, callback);
+            }
+
+            for (int i = 0; i < 512 * 25; i++)
+            {
+               Assert.assertEquals((byte) 'a', bigbuffer.get());
+            }
+
+            LibaioContext.freeBuffer(bigbuffer);
+
+            file.close();
+         }
+      }
+      finally
+      {
+         LibaioContext.freeBuffer(buffer);
+      }
+   }
+
+   @Test
+   public void testSubmitWriteAndRead() throws Exception
+   {
+      TestInfo callback = new TestInfo();
+
+      TestInfo[] callbacks = new TestInfo[LIBAIO_QUEUE_SIZE];
+
+      LibaioFile fileDescriptor = control.openFile(temporaryFolder.newFile("test.bin"), true);
+
+      // ByteBuffer buffer = ByteBuffer.allocateDirect(512);
+      ByteBuffer buffer = LibaioContext.newAlignedBuffer(512, 512);
+
+      try
+      {
+         for (int i = 0; i < 512; i++)
+         {
+            buffer.put((byte) 'a');
+         }
+
+         buffer.rewind();
+
+         fileDescriptor.write(0, 512, buffer, callback);
+
+         int retValue = control.poll(callbacks, 1, LIBAIO_QUEUE_SIZE);
+         Assert.assertEquals(1, retValue);
+
+         Assert.assertSame(callback, callbacks[0]);
+
+         LibaioContext.freeBuffer(buffer);
+
+         buffer = LibaioContext.newAlignedBuffer(512, 512);
+
+         for (int i = 0; i < 512; i++)
+         {
+            buffer.put((byte) 'B');
+         }
+
+         fileDescriptor.write(0, 512, buffer, null);
+
+         Assert.assertEquals(1, control.poll(callbacks, 1, LIBAIO_QUEUE_SIZE));
+
+         buffer.rewind();
+
+         fileDescriptor.read(0, 512, buffer, null);
+
+         Assert.assertEquals(1, control.poll(callbacks, 1, LIBAIO_QUEUE_SIZE));
+
+         for (int i = 0; i < 512; i++)
+         {
+            Assert.assertEquals('B', buffer.get());
+         }
+      }
+      finally
+      {
+         LibaioContext.freeBuffer(buffer);
+         fileDescriptor.close();
+      }
+   }
+
+   @Test
+   /**
+    * This file is making use of libaio without O_DIRECT
+    * We won't need special buffers on this case.
+    */
+   public void testSubmitWriteAndReadRegularBuffers() throws Exception
+   {
+      TestInfo callback = new TestInfo();
+
+      TestInfo[] callbacks = new TestInfo[LIBAIO_QUEUE_SIZE];
+
+      File file = temporaryFolder.newFile("test.bin");
+
+      fillupFile(file, LIBAIO_QUEUE_SIZE);
+
+      LibaioFile fileDescriptor = control.openFile(file, false);
+
+      final int BUFFER_SIZE = 50;
+
+      ByteBuffer buffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
+
+      try
+      {
+         for (int i = 0; i < BUFFER_SIZE; i++)
+         {
+            buffer.put((byte) 'a');
+         }
+
+         buffer.rewind();
+
+         fileDescriptor.write(0, BUFFER_SIZE, buffer, callback);
+
+         int retValue = control.poll(callbacks, 1, LIBAIO_QUEUE_SIZE);
+         System.out.println("Return from poll::" + retValue);
+         Assert.assertEquals(1, retValue);
+
+         Assert.assertSame(callback, callbacks[0]);
+
+         buffer.rewind();
+
+         for (int i = 0; i < BUFFER_SIZE; i++)
+         {
+            buffer.put((byte) 'B');
+         }
+
+         fileDescriptor.write(0, BUFFER_SIZE, buffer, null);
+
+         Assert.assertEquals(1, control.poll(callbacks, 1, LIBAIO_QUEUE_SIZE));
+
+         buffer.rewind();
+
+         fileDescriptor.read(0, 50, buffer, null);
+
+         Assert.assertEquals(1, control.poll(callbacks, 1, LIBAIO_QUEUE_SIZE));
+
+         for (int i = 0; i < BUFFER_SIZE; i++)
+         {
+            Assert.assertEquals('B', buffer.get());
+         }
+      }
+      finally
+      {
+         fileDescriptor.close();
+      }
+   }
+
+   @Test
+   public void testSubmitRead() throws Exception
+   {
+
+      TestInfo callback = new TestInfo();
+
+      TestInfo[] callbacks = new TestInfo[LIBAIO_QUEUE_SIZE];
+
+      File file = temporaryFolder.newFile("test.bin");
+
+      fillupFile(file, LIBAIO_QUEUE_SIZE);
+
+      LibaioFile fileDescriptor = control.openFile(file, true);
+
+      ByteBuffer buffer = LibaioContext.newAlignedBuffer(512, 512);
+
+      final int BUFFER_SIZE = 512;
+      try
+      {
+         for (int i = 0; i < BUFFER_SIZE; i++)
+         {
+            buffer.put((byte) '@');
+         }
+
+         fileDescriptor.write(0, BUFFER_SIZE, buffer, callback);
+         Assert.assertEquals(1, control.poll(callbacks, 1, LIBAIO_QUEUE_SIZE));
+         Assert.assertSame(callback, callbacks[0]);
+
+         buffer.rewind();
+
+         fileDescriptor.read(0, BUFFER_SIZE, buffer, callback);
+
+         Assert.assertEquals(1, control.poll(callbacks, 1, LIBAIO_QUEUE_SIZE));
+
+         Assert.assertSame(callback, callbacks[0]);
+
+         for (int i = 0; i < BUFFER_SIZE; i++)
+         {
+            Assert.assertEquals('@', buffer.get());
+         }
+      }
+      finally
+      {
+         LibaioContext.freeBuffer(buffer);
+         fileDescriptor.close();
+      }
+   }
+
+   @Test
+   public void testInvalidWrite() throws Exception
+   {
+
+      TestInfo callback = new TestInfo();
+
+      TestInfo[] callbacks = new TestInfo[LIBAIO_QUEUE_SIZE];
+
+      File file = temporaryFolder.newFile("test.bin");
+
+      fillupFile(file, LIBAIO_QUEUE_SIZE);
+
+      LibaioFile fileDescriptor = control.openFile(file, true);
+
+      try
+      {
+         ByteBuffer buffer = ByteBuffer.allocateDirect(300);
+         for (int i = 0; i < 300; i++)
+         {
+            buffer.put((byte) 'z');
+         }
+
+         fileDescriptor.write(0, 300, buffer, callback);
+
+         Assert.assertEquals(1, control.poll(callbacks, 1, LIBAIO_QUEUE_SIZE));
+
+         Assert.assertTrue(callbacks[0].isError());
+
+         // Error condition
+         Assert.assertSame(callbacks[0], callback);
+
+         System.out.println("Error:" + callbacks[0]);
+
+         buffer = fileDescriptor.newBuffer(512);
+         for (int i = 0; i < 512; i++)
+         {
+            buffer.put((byte) 'z');
+         }
+
+         callback = new TestInfo();
+
+         fileDescriptor.write(0, 512, buffer, callback);
+
+         Assert.assertEquals(1, control.poll(callbacks, 1, 1));
+
+         Assert.assertSame(callback, callbacks[0]);
+
+         fileDescriptor.write(5, 512, buffer, callback);
+
+         Assert.assertEquals(1, control.poll(callbacks, 1, 1));
+
+         Assert.assertTrue(callbacks[0].isError());
+
+         callbacks = null;
+         callback = null;
+
+         TestInfo.checkLeaks();
+      }
+      finally
+      {
+         fileDescriptor.close();
+      }
+   }
+
+   @Test
+   public void testLeaks() throws Exception
+   {
+      File file = temporaryFolder.newFile("test.bin");
+
+      fillupFile(file, LIBAIO_QUEUE_SIZE * 2);
+
+      TestInfo[] callbacks = new TestInfo[LIBAIO_QUEUE_SIZE];
+
+      LibaioFile<TestInfo> fileDescriptor = control.openFile(file, true);
+
+      ByteBuffer bufferWrite = LibaioContext.newAlignedBuffer(512, 512);
+
+      try
+      {
+         for (int i = 0; i < 512; i++)
+         {
+            bufferWrite.put((byte) 'B');
+         }
+
+         for (int j = 0; j < LIBAIO_QUEUE_SIZE * 2; j++)
+         {
+            for (int i = 0; i < LIBAIO_QUEUE_SIZE; i++)
+            {
+               TestInfo countClass = new TestInfo();
+               fileDescriptor.write(i * 512, 512, bufferWrite, countClass);
+            }
+
+            Assert.assertEquals(LIBAIO_QUEUE_SIZE, control.poll(callbacks, LIBAIO_QUEUE_SIZE, LIBAIO_QUEUE_SIZE));
+
+            for (int i = 0; i < LIBAIO_QUEUE_SIZE; i++)
+            {
+               Assert.assertNotNull(callbacks[i]);
+               callbacks[i] = null;
+            }
+         }
+
+         TestInfo.checkLeaks();
+      }
+      finally
+      {
+         LibaioContext.freeBuffer(bufferWrite);
+      }
+   }
+
+   @Test
+   public void testLock() throws Exception
+   {
+      File file = temporaryFolder.newFile("test.bin");
+
+      LibaioFile fileDescriptor = control.openFile(file, true);
+      fileDescriptor.lock();
+
+      fileDescriptor.close();
+   }
+
+   @Test
+   public void testAlloc() throws Exception
+   {
+      File file = temporaryFolder.newFile("test.bin");
+
+      LibaioFile fileDescriptor = control.openFile(file, true);
+      fileDescriptor.fill(10 * 1024 * 1024);
+
+      fileDescriptor.close();
+   }
+
+   @Test
+   public void testReleaseNullBuffer() throws Exception
+   {
+      boolean failed = false;
+      try
+      {
+         LibaioContext.freeBuffer(null);
+      }
+      catch (Exception expected)
+      {
+         failed = true;
+      }
+
+      Assert.assertTrue("Exception happened!", failed);
+
+   }
+
+   @Test
+   public void testMemset() throws Exception
+   {
+
+      ByteBuffer buffer = LibaioContext.newAlignedBuffer(512 * 8, 512);
+
+      for (int i = 0; i < buffer.capacity(); i++)
+      {
+         buffer.put((byte) 'z');
+      }
+
+      buffer.position(0);
+
+      for (int i = 0; i < buffer.capacity(); i++)
+      {
+         Assert.assertEquals((byte) 'z', buffer.get());
+      }
+
+      control.memsetBuffer(buffer);
+
+      buffer.position(0);
+
+      for (int i = 0; i < buffer.capacity(); i++)
+      {
+         Assert.assertEquals((byte) 0, buffer.get());
+      }
+
+      LibaioContext.freeBuffer(buffer);
+
+   }
+
+   @Test
+   public void testIOExceptionConditions() throws Exception
+   {
+      boolean exceptionThrown = false;
+
+      control.close();
+      control = new LibaioContext<>(LIBAIO_QUEUE_SIZE, false);
+      try
+      {
+         // There is no space for a queue this huge, the native layer should throw the exception
+         LibaioContext newController = new LibaioContext(Integer.MAX_VALUE, false);
+      }
+      catch (RuntimeException e)
+      {
+         exceptionThrown = true;
+      }
+
+      Assert.assertTrue(exceptionThrown);
+      exceptionThrown = false;
+
+      try
+      {
+         // this should throw an exception, we shouldn't be able to open a directory!
+         control.openFile(temporaryFolder.getRoot(), true);
+      }
+      catch (IOException expected)
+      {
+         exceptionThrown = true;
+      }
+
+      Assert.assertTrue(exceptionThrown);
+
+      exceptionThrown = false;
+
+      LibaioFile fileDescriptor = control.openFile(temporaryFolder.newFile(), true);
+      fileDescriptor.close();
+      try
+      {
+         fileDescriptor.close();
+      }
+      catch (IOException expected)
+      {
+         exceptionThrown = true;
+      }
+
+      Assert.assertTrue(exceptionThrown);
+
+      fileDescriptor = control.openFile(temporaryFolder.newFile(), true);
+
+      ByteBuffer buffer = fileDescriptor.newBuffer(512);
+
+      try
+      {
+         for (int i = 0; i < 512; i++)
+         {
+            buffer.put((byte) 'a');
+         }
+
+         for (int i = 0; i < LIBAIO_QUEUE_SIZE; i++)
+         {
+            fileDescriptor.write(i * 512, 512, buffer, new TestInfo());
+         }
+
+         boolean ex = false;
+         try
+         {
+            fileDescriptor.write(0, 512, buffer, new TestInfo());
+         }
+         catch (Exception e)
+         {
+            ex = true;
+         }
+
+         Assert.assertTrue(ex);
+
+         TestInfo[] callbacks = new TestInfo[LIBAIO_QUEUE_SIZE];
+         Assert.assertEquals(LIBAIO_QUEUE_SIZE, control.poll(callbacks, LIBAIO_QUEUE_SIZE, LIBAIO_QUEUE_SIZE));
+
+         // it should be possible to write now after queue space being released
+         fileDescriptor.write(0, 512, buffer, new TestInfo());
+         Assert.assertEquals(1, control.poll(callbacks, 1, 100));
+
+         TestInfo errorCallback = new TestInfo();
+         // odd positions will have failures through O_DIRECT
+         fileDescriptor.read(3, 512, buffer, errorCallback);
+         Assert.assertEquals(1, control.poll(callbacks, 1, 50));
+         Assert.assertTrue(callbacks[0].isError());
+         Assert.assertSame(errorCallback, (callbacks[0]));
+
+         // to help GC and the checkLeaks
+         callbacks = null;
+         errorCallback = null;
+
+         TestInfo.checkLeaks();
+
+         exceptionThrown = false;
+         try
+         {
+            LibaioContext.newAlignedBuffer(300, 512);
+         }
+         catch (RuntimeException e)
+         {
+            exceptionThrown = true;
+         }
+
+         Assert.assertTrue(exceptionThrown);
+
+         exceptionThrown = false;
+         try
+         {
+            LibaioContext.newAlignedBuffer(-512, 512);
+         }
+         catch (RuntimeException e)
+         {
+            exceptionThrown = true;
+         }
+
+         Assert.assertTrue(exceptionThrown);
+      }
+      finally
+      {
+         LibaioContext.freeBuffer(buffer);
+      }
+   }
+
+   @Test
+   public void testBlockedCallback() throws Exception
+   {
+      final LibaioContext blockedContext = new LibaioContext(500, true);
+      Thread t = new Thread()
+      {
+         public void run()
+         {
+            blockedContext.poll();
+         }
+      };
+
+      t.start();
+
+
+      int NUMBER_OF_BLOCKS = 5000;
+
+      final CountDownLatch latch = new CountDownLatch(NUMBER_OF_BLOCKS);
+
+      File file = temporaryFolder.newFile("sub-file.txt");
+      LibaioFile aioFile = blockedContext.openFile(file, true);
+      aioFile.fill(NUMBER_OF_BLOCKS * 512);
+
+      final AtomicInteger errors = new AtomicInteger(0);
+
+      class MyCallback implements SubmitInfo
+      {
+         @Override
+         public void onError(int errno, String message)
+         {
+            errors.incrementAndGet();
+         }
+
+         @Override
+         public void done()
+         {
+            latch.countDown();
+         }
+      }
+
+      MyCallback callback = new MyCallback();
+
+      ByteBuffer buffer = LibaioContext.newAlignedBuffer(512, 512);
+
+
+      for (int i = 0; i < 512; i++)
+      {
+         buffer.put((byte)'a');
+      }
+
+      long start = System.currentTimeMillis();
+
+      for (int i = 0; i < NUMBER_OF_BLOCKS; i++)
+      {
+         aioFile.write(i * 512, 512, buffer, callback);
+      }
+
+      long end = System.currentTimeMillis();
+
+      latch.await();
+
+
+      System.out.println("time = " + (end - start) + " writes/second=" + NUMBER_OF_BLOCKS * 1000L / (end - start));
+//
+//      MultiThreadAsynchronousFileTest.debug((sync ? "Sync result:" : "Async result:") + " Records/Second = " +
+//                                               MultiThreadAsynchronousFileTest.NUMBER_OF_THREADS *
+//                                                  MultiThreadAsynchronousFileTest.NUMBER_OF_LINES *
+//                                                  1000 /
+//                                                  (endTime - startTime) +
+//                                               " total time = " +
+//                                               (endTime - startTime) +
+//                                               " total number of records = " +
+//                                               MultiThreadAsynchronousFileTest.NUMBER_OF_THREADS *
+//                                                  MultiThreadAsynchronousFileTest.NUMBER_OF_LINES);
+
+      Thread.sleep(100);
+
+      blockedContext.close();
+      t.join();
+   }
+
+   private void fillupFile(File file, int blocks) throws IOException
+   {
+      FileOutputStream fileOutputStream = new FileOutputStream(file);
+      byte[] bufferWrite = new byte[512];
+      for (int i = 0; i < 512; i++)
+      {
+         bufferWrite[i] = (byte) 0;
+      }
+
+      for (int i = 0; i < blocks; i++)
+      {
+         fileOutputStream.write(bufferWrite);
+      }
+
+      fileOutputStream.close();
+   }
+
+
+   static class TestInfo implements SubmitInfo
+   {
+      static AtomicInteger count = new AtomicInteger();
+
+      @Override
+      protected void finalize() throws Throwable
+      {
+         super.finalize();
+         count.decrementAndGet();
+      }
+
+      public static void checkLeaks() throws InterruptedException
+      {
+         for (int i = 0; count.get() != 0 && i < 50; i++)
+         {
+            WeakReference reference = new WeakReference(new Object());
+            while (reference.get() != null)
+            {
+               System.gc();
+               Thread.sleep(100);
+            }
+         }
+         Assert.assertEquals(0, count.get());
+      }
+
+      boolean error = false;
+      String errorMessage;
+      int errno;
+
+      public TestInfo()
+      {
+         count.incrementAndGet();
+      }
+
+      @Override
+      public void onError(int errno, String message)
+      {
+         this.errno = errno;
+         this.errorMessage = message;
+         this.error = true;
+      }
+
+      @Override
+      public void done()
+      {
+      }
+
+      public int getErrno()
+      {
+         return errno;
+      }
+
+      public void setErrno(int errno)
+      {
+         this.errno = errno;
+      }
+
+      public boolean isError()
+      {
+         return error;
+      }
+
+      public void setError(boolean error)
+      {
+         this.error = error;
+      }
+
+      public String getErrorMessage()
+      {
+         return errorMessage;
+      }
+
+      public void setErrorMessage(String errorMessage)
+      {
+         this.errorMessage = errorMessage;
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
index 71610b4..f74d6d7 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
@@ -20,6 +20,7 @@ package org.apache.activemq.artemis.core.protocol.proton.plug;
 import java.util.concurrent.Executor;
 
 import io.netty.buffer.ByteBuf;
+import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
@@ -30,7 +31,6 @@ import org.apache.qpid.proton.jms.EncodedMessage;
 import org.apache.qpid.proton.message.ProtonJMessage;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
-import org.apache.activemq.artemis.core.journal.IOAsyncTask;
 import org.apache.activemq.artemis.core.protocol.proton.ProtonProtocolManager;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
@@ -275,7 +275,7 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
 
       serverSession.send(message, false);
 
-      manager.getServer().getStorageManager().afterCompleteOperations(new IOAsyncTask()
+      manager.getServer().getStorageManager().afterCompleteOperations(new IOCallback()
       {
          @Override
          public void done()

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index aa3f9e0..45260d5 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -22,7 +22,7 @@ import io.netty.buffer.EmptyByteBuf;
 import io.netty.handler.codec.mqtt.MqttMessageType;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.journal.IOAsyncTask;
+import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
@@ -183,7 +183,7 @@ public class MQTTPublishManager
 
    private void createMessageAck(final int messageId, final int qos)
    {
-      session.getServer().getStorageManager().afterCompleteOperations(new IOAsyncTask()
+      session.getServer().getStorageManager().afterCompleteOperations(new IOCallback()
       {
          @Override
          public void done()


Mime
View raw message