Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0C4BD18216 for ; Thu, 30 Jul 2015 09:14:24 +0000 (UTC) Received: (qmail 45385 invoked by uid 500); 30 Jul 2015 09:14:23 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 45249 invoked by uid 500); 30 Jul 2015 09:14:23 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 45054 invoked by uid 99); 30 Jul 2015 09:14:23 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 30 Jul 2015 09:14:23 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 94F29E6834; Thu, 30 Jul 2015 09:14:23 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: andytaylor@apache.org To: commits@activemq.apache.org Date: Thu, 30 Jul 2015 09:14:26 -0000 Message-Id: <11d574c03c244811b2d9b5044c0fde86@git.apache.org> In-Reply-To: <00c35ef8945f47939392b209f799f4cf@git.apache.org> References: <00c35ef8945f47939392b209f799f4cf@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/9] activemq-artemis git commit: ARTEMIS-163 First pass on the native AIO refactoring http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c ---------------------------------------------------------------------- diff --git a/artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c b/artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c new file mode 100644 index 0000000..b6fcdd3 --- /dev/null +++ b/artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c @@ -0,0 +1,710 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _GNU_SOURCE +// libaio, O_DIRECT and other things won't be available without this define +#define _GNU_SOURCE +#endif + +//#define DEBUG + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "org_apache_activemq_artemis_jlibaio_LibaioContext.h" +#include "exception_helper.h" + +struct io_control { + io_context_t ioContext; + struct io_event * events; + + jobject thisObject; + + // This is used to make sure we don't return IOCB while something else is using them + // this is to guarantee the submits could be done concurrently with polling + pthread_mutex_t iocbLock; + + pthread_mutex_t pollLock; + + // a resuable pool of iocb + struct iocb ** iocb; + int queueSize; + int iocbPut; + int iocbGet; + int used; +}; + +jclass submitClass = NULL; +jmethodID errorMethod = NULL; +jmethodID doneMethod = NULL; +jmethodID libaioContextDone = NULL; + +jclass libaioContextClass = NULL; +jclass runtimeExceptionClass = NULL; +jclass ioExceptionClass = NULL; + +// util methods +void throwRuntimeException(JNIEnv* env, char* message) { + (*env)->ThrowNew(env, runtimeExceptionClass, message); +} + +void throwRuntimeExceptionErrorNo(JNIEnv* env, char* message, int errorNumber) { + char* allocatedMessage = exceptionMessage(message, errorNumber); + (*env)->ThrowNew(env, runtimeExceptionClass, allocatedMessage); + free(allocatedMessage); +} + +void throwIOException(JNIEnv* env, char* message) { + (*env)->ThrowNew(env, ioExceptionClass, message); +} + +void throwIOExceptionErrorNo(JNIEnv* env, char* message, int errorNumber) { + char* allocatedMessage = exceptionMessage(message, errorNumber); + (*env)->ThrowNew(env, ioExceptionClass, allocatedMessage); + free(allocatedMessage); +} + +void throwOutOfMemoryError(JNIEnv* env) { + jclass exceptionClass = (*env)->FindClass(env, "java/lang/OutOfMemoryError"); + (*env)->ThrowNew(env, exceptionClass, ""); +} + +/** Notice: every usage of exceptionMessage needs to release the allocated memory for the sequence of char */ +char* exceptionMessage(char* msg, int error) { + if (error < 0) { + // some functions return negative values + // and it's hard to keep track of when to send -error and when not + // this will just take care when things are forgotten + // what would generate a proper error + error = error * -1; + } + //strerror is returning a constant, so no need to free anything coming from strerror + char* err = strerror(error); + char* result = malloc(strlen(msg) + strlen(err) + 1); + strcpy(result, msg); + strcat(result, err); + return result; +} + +jint JNI_OnLoad(JavaVM* vm, void* reserved) { + JNIEnv* env; + if ((*vm)->GetEnv(vm, (void**) &env, JNI_VERSION_1_6) != JNI_OK) { + return JNI_ERR; + } else { + jclass localRuntimeExceptionClass = (*env)->FindClass(env, "java/lang/RuntimeException"); + if (localRuntimeExceptionClass == NULL) { + // pending exception... + return JNI_ERR; + } + runtimeExceptionClass = (jclass) (*env)->NewGlobalRef(env, localRuntimeExceptionClass); + if (runtimeExceptionClass == NULL) { + // out-of-memory! + throwOutOfMemoryError(env); + return JNI_ERR; + } + + jclass localIoExceptionClass = (*env)->FindClass(env, "java/io/IOException"); + if (localIoExceptionClass == NULL) { + // pending exception... + return JNI_ERR; + } + ioExceptionClass = (jclass) (*env)->NewGlobalRef(env, localIoExceptionClass); + if (ioExceptionClass == NULL) { + // out-of-memory! + throwOutOfMemoryError(env); + return JNI_ERR; + } + + submitClass = (*env)->FindClass(env, "org/apache/activemq/artemis/jlibaio/SubmitInfo"); + if (submitClass == NULL) { + return JNI_ERR; + } + + submitClass = (jclass)(*env)->NewGlobalRef(env, (jobject)submitClass); + + errorMethod = (*env)->GetMethodID(env, submitClass, "onError", "(ILjava/lang/String;)V"); + if (errorMethod == NULL) { + return JNI_ERR; + } + errorMethod = (jmethodID)(*env)->NewGlobalRef(env, (jobject)(errorMethod)); + + doneMethod = (*env)->GetMethodID(env, submitClass, "done", "()V"); + if (doneMethod == NULL) { + return JNI_ERR; + } + doneMethod = (jmethodID)(*env)->NewGlobalRef(env, (jobject)(doneMethod)); + + libaioContextClass = (*env)->FindClass(env, "org/apache/activemq/artemis/jlibaio/LibaioContext"); + if (libaioContextClass == NULL) { + return JNI_ERR; + } + libaioContextClass = (jclass)(*env)->NewGlobalRef(env, (jobject)libaioContextClass); + + libaioContextDone = (*env)->GetMethodID(env, libaioContextClass, "done", "(Lorg/apache/activemq/artemis/jlibaio/SubmitInfo;)V"); + if (libaioContextDone == NULL) { + return JNI_ERR; + } + libaioContextDone = (jmethodID)(*env)->NewGlobalRef(env, (jobject)libaioContextDone); + + return JNI_VERSION_1_6; + } +} + +void JNI_OnUnload(JavaVM* vm, void* reserved) { + JNIEnv* env; + if ((*vm)->GetEnv(vm, (void**) &env, JNI_VERSION_1_6) != JNI_OK) { + // Something is wrong but nothing we can do about this :( + return; + } else { + // delete global references so the GC can collect them + if (runtimeExceptionClass != NULL) { + (*env)->DeleteGlobalRef(env, runtimeExceptionClass); + } + if (ioExceptionClass != NULL) { + (*env)->DeleteGlobalRef(env, ioExceptionClass); + } + + // Deleting global refs so their classes can be GCed + if (errorMethod != NULL) { + (*env)->DeleteGlobalRef(env, (jobject)errorMethod); + } + + if (doneMethod != NULL) { + (*env)->DeleteGlobalRef(env, (jobject)doneMethod); + } + + if (submitClass != NULL) { + (*env)->DeleteGlobalRef(env, (jobject)submitClass); + } + + if (libaioContextClass != NULL) { + (*env)->DeleteGlobalRef(env, (jobject)libaioContextClass); + } + + if (libaioContextDone != NULL) { + (*env)->DeleteGlobalRef(env, (jobject)libaioContextDone); + } + } +} + +static inline struct io_control * getIOControl(JNIEnv* env, jobject pointer) { + struct io_control * ioControl = (struct io_control *) (*env)->GetDirectBufferAddress(env, pointer); + if (ioControl == NULL) { + throwRuntimeException(env, "Controller not initialized"); + } + return ioControl; +} + +/** + * remove an iocb from the pool of IOCBs. Returns null if full + */ +static inline struct iocb * getIOCB(struct io_control * control) { + struct iocb * iocb = 0; + + pthread_mutex_lock(&(control->iocbLock)); + + #ifdef DEBUG + fprintf (stdout, "getIOCB::used=%d, queueSize=%d, get=%d, put=%d\n", control->used, control->queueSize, control->iocbGet, control->iocbPut); + #endif + + if (control->used < control->queueSize) { + control->used++; + iocb = control->iocb[control->iocbGet++]; + + if (control->iocbGet >= control->queueSize) { + control->iocbGet = 0; + } + } + + pthread_mutex_unlock(&(control->iocbLock)); + return iocb; +} + +/** + * Put an iocb back on the pool of IOCBs + */ +static inline void putIOCB(struct io_control * control, struct iocb * iocbBack) { + pthread_mutex_lock(&(control->iocbLock)); + + #ifdef DEBUG + fprintf (stdout, "putIOCB::used=%d, queueSize=%d, get=%d, put=%d\n", control->used, control->queueSize, control->iocbGet, control->iocbPut); + #endif + + control->used--; + control->iocb[control->iocbPut++] = iocbBack; + if (control->iocbPut >= control->queueSize) { + control->iocbPut = 0; + } + pthread_mutex_unlock(&(control->iocbLock)); +} + +static inline void * getBuffer(JNIEnv* env, jobject pointer) { + return (*env)->GetDirectBufferAddress(env, pointer); +} + +JNIEXPORT jboolean JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_lock + (JNIEnv * env, jclass clazz, jint handle) { + return flock(handle, LOCK_EX | LOCK_NB) == 0; +} + +/** + * Everything that is allocated here will be freed at deleteContext when the class is unloaded. + */ +JNIEXPORT jobject JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_newContext(JNIEnv* env, jobject thisObject, jint queueSize) { + io_context_t libaioContext; + int i = 0; + + #ifdef DEBUG + fprintf (stdout, "Initializing context\n"); + #endif + + int res = io_queue_init(queueSize, &libaioContext); + if (res) { + // Error, so need to release whatever was done before + free(libaioContext); + + throwRuntimeExceptionErrorNo(env, "Cannot initialize queue:", res); + return NULL; + } + + struct iocb ** iocb = (struct iocb **)malloc((sizeof(struct iocb *) * (size_t)queueSize)); + if (iocb == NULL) { + throwOutOfMemoryError(env); + return NULL; + } + + for (i = 0; i < queueSize; i++) { + iocb[i] = (struct iocb *)malloc(sizeof(struct iocb)); + if (iocb[i] == NULL) { + // it's unlikely this would happen at this point + // for that reason I'm not cleaning up individual IOCBs here + // we could increase support here with a cleanup of any previously allocated iocb + // But I'm afraid of adding not needed complexity here + throwOutOfMemoryError(env); + return NULL; + } + } + + struct io_control * theControl = (struct io_control *) malloc(sizeof(struct io_control)); + if (theControl == NULL) { + throwOutOfMemoryError(env); + return NULL; + } + + res = pthread_mutex_init(&(theControl->iocbLock), 0); + if (res) { + free(theControl); + free(libaioContext); + throwRuntimeExceptionErrorNo(env, "Can't initialize mutext:", res); + return NULL; + } + + res = pthread_mutex_init(&(theControl->pollLock), 0); + if (res) { + free(theControl); + free(libaioContext); + throwRuntimeExceptionErrorNo(env, "Can't initialize mutext:", res); + return NULL; + } + + struct io_event * events = (struct io_event *)malloc(sizeof(struct io_event) * (size_t)queueSize); + + theControl->ioContext = libaioContext; + theControl->events = events; + theControl->iocb = iocb; + theControl->queueSize = queueSize; + theControl->iocbPut = 0; + theControl->iocbGet = 0; + theControl->used = 0; + theControl->thisObject = (*env)->NewGlobalRef(env, thisObject); + + return (*env)->NewDirectByteBuffer(env, theControl, sizeof(struct io_control)); +} + +JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_deleteContext(JNIEnv* env, jclass clazz, jobject contextPointer) { + int i; + struct io_control * theControl = getIOControl(env, contextPointer); + if (theControl == NULL) { + return; + } + + io_queue_release(theControl->ioContext); + + // to make sure the poll has finished + pthread_mutex_lock(&(theControl->pollLock)); + pthread_mutex_unlock(&(theControl->pollLock)); + + pthread_mutex_destroy(&(theControl->pollLock)); + pthread_mutex_destroy(&(theControl->iocbLock)); + + // Releasing each individual iocb + for (i = 0; i < theControl->queueSize; i++) { + free(theControl->iocb[i]); + } + + (*env)->DeleteGlobalRef(env, theControl->thisObject); + + free(theControl->iocb); + free(theControl->events); + free(theControl); +} + +JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_close(JNIEnv* env, jclass clazz, jint fd) { + if (close(fd) < 0) { + throwIOExceptionErrorNo(env, "Error closing file:", errno); + } +} + +JNIEXPORT int JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_open(JNIEnv* env, jclass clazz, + jstring path, jboolean direct) { + const char* f_path = (*env)->GetStringUTFChars(env, path, 0); + + int res; + if (direct) { + res = open(f_path, O_RDWR | O_CREAT | O_DIRECT, 0666); + } else { + res = open(f_path, O_RDWR | O_CREAT, 0666); + } + + (*env)->ReleaseStringUTFChars(env, path, f_path); + + if (res < 0) { + throwIOExceptionErrorNo(env, "Cannot open file:", errno); + } + + return res; +} + +static inline void submit(JNIEnv * env, struct io_control * theControl, struct iocb * iocb) { + int result = io_submit(theControl->ioContext, 1, &iocb); + + if (result < 0) { + // Putting the Global Ref and IOCB back in case of a failure + (*env)->DeleteGlobalRef(env, (jobject)iocb->data); + putIOCB(theControl, iocb); + + throwIOExceptionErrorNo(env, "Error while submitting IO: ", -result); + } + + return; +} + +JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_submitWrite + (JNIEnv * env, jclass clazz, jint fileHandle, jobject contextPointer, jlong position, jint size, jobject bufferWrite, jobject callback) { + struct io_control * theControl = getIOControl(env, contextPointer); + if (theControl == NULL) { + return; + } + + #ifdef DEBUG + fprintf (stdout, "submitWrite position %ld, size %d\n", position, size); + #endif + + struct iocb * iocb = getIOCB(theControl); + + if (iocb == NULL) { + throwIOException(env, "Not enough space in libaio queue"); + return; + } + + io_prep_pwrite(iocb, fileHandle, getBuffer(env, bufferWrite), (size_t)size, position); + + // The GlobalRef will be deleted when poll is called. this is done so + // the vm wouldn't crash if the Callback passed by the user is GCed between submission + // and callback. + // also as the real intention is to hold the reference until the life cycle is complete + iocb->data = (void *) (*env)->NewGlobalRef(env, callback); + + return submit(env, theControl, iocb); +} + +JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_submitRead + (JNIEnv * env, jclass clazz, jint fileHandle, jobject contextPointer, jlong position, jint size, jobject bufferRead, jobject callback) { + struct io_control * theControl = getIOControl(env, contextPointer); + if (theControl == NULL) { + return; + } + + struct iocb * iocb = getIOCB(theControl); + + if (iocb == NULL) { + throwIOException(env, "Not enough space in libaio queue"); + return; + } + + io_prep_pread(iocb, fileHandle, getBuffer(env, bufferRead), (size_t)size, position); + + // The GlobalRef will be deleted when poll is called. this is done so + // the vm wouldn't crash if the Callback passed by the user is GCed between submission + // and callback. + // also as the real intention is to hold the reference until the life cycle is complete + iocb->data = (void *) (*env)->NewGlobalRef(env, callback); + + return submit(env, theControl, iocb); +} + +JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_blockedPoll + (JNIEnv * env, jobject thisObject, jobject contextPointer) { + + #ifdef DEBUG + fprintf (stdout, "Running blockedPoll"); + #endif + + int i; + struct io_control * theControl = getIOControl(env, contextPointer); + if (theControl == NULL) { + return; + } + int max = theControl->queueSize; + pthread_mutex_lock(&(theControl->pollLock)); + + for (;;) { + + int result = io_getevents(theControl->ioContext, 1, max, theControl->events, 0); + + if (result < 0) + { + #ifdef DEBUG + fprintf (stdout, "finished blockedPoll rutine with result=%d\n", result); + #endif + break; + } + #ifdef DEBUG + fprintf (stdout, "blockedPoll returned %d events\n", result); + #endif + + for (i = 0; i < result; i++) + { + #ifdef DEBUG + fprintf (stdout, "blockedPoll treading event %d\n", i); + #endif + struct io_event * event = &(theControl->events[i]); + struct iocb * iocbp = event->obj; + int eventResult = (int)event->res; + + #ifdef DEBUG + fprintf (stdout, "Poll res: %d totalRes=%d\n", eventResult, result); + #endif + + if (eventResult < 0) { + #ifdef DEBUG + fprintf (stdout, "Error: %s\n", strerror(-eventResult)); + #endif + + jstring jstrError = (*env)->NewStringUTF(env, strerror(-eventResult)); + + if (iocbp->data != NULL) { + (*env)->CallVoidMethod(env, (jobject)(iocbp->data), errorMethod, (jint)(-eventResult), jstrError); + } + } + + jobject obj = (jobject)iocbp->data; + putIOCB(theControl, iocbp); + + if (obj != NULL) { + (*env)->CallVoidMethod(env, theControl->thisObject, libaioContextDone,obj); + // We delete the globalRef after the completion of the callback + (*env)->DeleteGlobalRef(env, obj); + } + + } + } + + pthread_mutex_unlock(&(theControl->pollLock)); + +} + +JNIEXPORT jint JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_poll + (JNIEnv * env, jobject obj, jobject contextPointer, jobjectArray callbacks, jint min, jint max) { + int i = 0; + struct io_control * theControl = getIOControl(env, contextPointer); + if (theControl == NULL) { + return 0; + } + + + int result = io_getevents(theControl->ioContext, min, max, theControl->events, 0); + int retVal = result; + + for (i = 0; i < result; i++) { + struct io_event * event = &(theControl->events[i]); + struct iocb * iocbp = event->obj; + int eventResult = (int)event->res; + + #ifdef DEBUG + fprintf (stdout, "Poll res: %d totalRes=%d\n", eventResult, result); + #endif + + if (eventResult < 0) { + #ifdef DEBUG + fprintf (stdout, "Error: %s\n", strerror(-eventResult)); + #endif + + jstring jstrError = (*env)->NewStringUTF(env, strerror(-eventResult)); + + (*env)->CallVoidMethod(env, (jobject)(iocbp->data), errorMethod, (jint)(-eventResult), jstrError); + } + + (*env)->SetObjectArrayElement(env, callbacks, i, (jobject)iocbp->data); + + if (iocbp->data != NULL) { + // We delete the globalRef after the completion of the callback + (*env)->DeleteGlobalRef(env, (jobject)iocbp->data); + } + + putIOCB(theControl, iocbp); + } + + return retVal; +} + +JNIEXPORT jobject JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_newAlignedBuffer +(JNIEnv * env, jclass clazz, jint size, jint alignment) { + if (size % alignment != 0) { + throwRuntimeException(env, "Buffer size needs to be aligned to passed argument"); + return NULL; + } + + // This will allocate a buffer, aligned by alignment. + // Buffers created here need to be manually destroyed by destroyBuffer, or this would leak on the process heap away of Java's GC managed memory + // NOTE: this buffer will contain non initialized data, you must fill it up properly + void * buffer; + int result = posix_memalign(&buffer, (size_t)alignment, (size_t)size); + + if (result) { + throwRuntimeExceptionErrorNo(env, "Can't allocate posix buffer:", result); + return NULL; + } + + memset(buffer, 0, (size_t)size); + + return (*env)->NewDirectByteBuffer(env, buffer, size); +} + +JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_freeBuffer + (JNIEnv * env, jclass clazz, jobject jbuffer) { + if (jbuffer == NULL) + { + throwRuntimeException(env, "Null pointer"); + return; + } + void * buffer = (*env)->GetDirectBufferAddress(env, jbuffer); + free(buffer); +} + + +/** It does nothing... just return true to make sure it has all the binary dependencies */ +JNIEXPORT jint JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_getNativeVersion + (JNIEnv * env, jclass clazz) + +{ + return org_apache_activemq_artemis_jlibaio_LibaioContext_EXPECTED_NATIVE_VERSION; +} + +JNIEXPORT jlong JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_getSize + (JNIEnv * env, jclass clazz, jint fd) +{ + struct stat statBuffer; + + if (fstat(fd, &statBuffer) < 0) + { + throwIOExceptionErrorNo(env, "Cannot determine file size:", errno); + return -1l; + } + return statBuffer.st_size; +} + +JNIEXPORT jint JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_getBlockSizeFD + (JNIEnv * env, jclass clazz, jint fd) +{ + struct stat statBuffer; + + if (fstat(fd, &statBuffer) < 0) + { + throwIOExceptionErrorNo(env, "Cannot determine file size:", errno); + return -1l; + } + return statBuffer.st_blksize; +} + +JNIEXPORT jint JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_getBlockSize + (JNIEnv * env, jclass clazz, jstring path) +{ + const char* f_path = (*env)->GetStringUTFChars(env, path, 0); + struct stat statBuffer; + + if (stat(f_path, &statBuffer) < 0) + { + throwIOExceptionErrorNo(env, "Cannot determine file size:", errno); + return -1l; + } + + (*env)->ReleaseStringUTFChars(env, path, f_path); + + return statBuffer.st_blksize; +} + +JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_fallocate + (JNIEnv * env, jclass clazz, jint fd, jlong size) +{ + if (fallocate(fd, 0, 0, (off_t) size) < 0) + { + throwIOExceptionErrorNo(env, "Could not preallocate file", errno); + } + fsync(fd); + lseek (fd, 0, SEEK_SET); +} + +JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_fill + (JNIEnv * env, jclass clazz, jint fd, jlong size) +{ + void * preAllocBuffer = 0; + if (posix_memalign(&preAllocBuffer, 512, size) != 0) + { + throwOutOfMemoryError(env); + return; + } + memset(preAllocBuffer, 0, size); + lseek (fd, 0, SEEK_SET); + write(fd, preAllocBuffer, size); + lseek (fd, 0, SEEK_SET); + free (preAllocBuffer); +} + +JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_memsetBuffer + (JNIEnv *env, jclass clazz, jobject jbuffer, jint size) +{ + #ifdef DEBUG + fprintf (stdout, "Mem setting buffer with %d bytes\n", size); + #endif + void * buffer = (*env)->GetDirectBufferAddress(env, jbuffer); + + if (buffer == 0) + { + throwRuntimeException(env, "Invalid Buffer used, libaio requires NativeBuffer instead of Java ByteBuffer"); + return; + } + + memset(buffer, 0, (size_t)size); +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/java/org/apache/activemq/artemis/core/libaio/Native.java ---------------------------------------------------------------------- diff --git a/artemis-native/src/main/java/org/apache/activemq/artemis/core/libaio/Native.java b/artemis-native/src/main/java/org/apache/activemq/artemis/core/libaio/Native.java deleted file mode 100644 index 51f5da7..0000000 --- a/artemis-native/src/main/java/org/apache/activemq/artemis/core/libaio/Native.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.libaio; - -import java.nio.ByteBuffer; - -import org.apache.activemq.artemis.api.core.ActiveMQException; - -public class Native -{ - // Functions used for locking files ..... - public static native int openFile(String fileName); - - public static native void closeFile(int handle); - - public static native boolean flock(int handle); - // Functions used for locking files ^^^^^^^^ - - public static native void resetBuffer(ByteBuffer directByteBuffer, int size); - - public static native void destroyBuffer(ByteBuffer buffer); - - public static native ByteBuffer newNativeBuffer(long size); - - public static native void newInit(Class someClass); - - public static native ByteBuffer init(Class controllerClass, String fileName, int maxIO, Object logger) throws ActiveMQException; - - public static native long size0(ByteBuffer handle); - - public static native void write(Object thisObject, ByteBuffer handle, - long sequence, - long position, - long size, - ByteBuffer buffer, - Object aioPackageCallback) throws ActiveMQException; - - /** a direct write to the file without the use of libaio's submit. */ - public static native void writeInternal(ByteBuffer handle, long positionToWrite, long size, ByteBuffer bytes) throws ActiveMQException; - - /** - *This is using org.apache.activemq.artemis.core.asyncio.AIOCallback - */ - public static native void read(Object thisObject, ByteBuffer handle, long position, long size, ByteBuffer buffer, Object aioPackageCallback) throws ActiveMQException; - - public static native void fill(ByteBuffer handle, long position, int blocks, long size, byte fillChar) throws ActiveMQException; - - public static native void closeInternal(ByteBuffer handler); - - public static native void stopPoller(ByteBuffer handler); - - /** A native method that does nothing, and just validate if the ELF dependencies are loaded and on the correct platform as this binary format */ - public static native int getNativeVersion(); - - /** Poll asynchronous events from internal queues */ - public static native void internalPollEvents(ByteBuffer handler); - - // Inner classes --------------------------------------------------------------------- - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java ---------------------------------------------------------------------- diff --git a/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java new file mode 100644 index 0000000..8b45f54 --- /dev/null +++ b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java @@ -0,0 +1,446 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.jlibaio; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import static io.netty.util.internal.ObjectUtil.checkNotNull; + +/** + * This class is used as an aggregator for the {@link LibaioFile}. + *
+ * It holds native data, and it will share a libaio queue that can be used by multiple files. + *
+ * You need to use the poll methods to read the result of write and read submissions. + *
+ * You also need to use the special buffer created by {@link LibaioFile} as you need special alignments + * when dealing with O_DIRECT files. + *
+ * A Single controller can server multiple files. There's no need to create one controller per file. + *
+ * Interesting reading for this. + */ +public class LibaioContext implements Closeable +{ + + private static final AtomicLong totalMaxIO = new AtomicLong(0); + + /** + * This definition needs to match Version.h on the native sources. + *
+ * Or else the native module won't be loaded because of version mismatches + */ + private static final int EXPECTED_NATIVE_VERSION = 1; + + private static boolean loaded = false; + + public static boolean isLoaded() + { + return loaded; + } + + private static boolean loadLibrary(final String name) + { + try + { + System.loadLibrary(name); + if (getNativeVersion() != EXPECTED_NATIVE_VERSION) + { + NativeLogger.LOGGER.incompatibleNativeLibrary(); + return false; + } + else + { + return true; + } + } + catch (Throwable e) + { + NativeLogger.LOGGER.debug(name + " -> error loading the native library", e); + return false; + } + + } + + static + { + String[] libraries = new String[]{"artemis-native-64", "artemis-native-32"}; + + for (String library : libraries) + { + if (loadLibrary(library)) + { + loaded = true; + break; + } + else + { + NativeLogger.LOGGER.debug("Library " + library + " not found!"); + } + } + + if (!loaded) + { + NativeLogger.LOGGER.debug("Couldn't locate LibAIO Wrapper"); + } + } + + /** + * This is used to validate leaks on tests. + * @return the number of allocated aio, to be used on test checks. + */ + public static long getTotalMaxIO() + { + return totalMaxIO.get(); + } + + /** + * It will reset all the positions on the buffer to 0, using memset. + * @param buffer a native buffer. +s */ + public void memsetBuffer(ByteBuffer buffer) + { + memsetBuffer(buffer, buffer.limit()); + } + + /** + * This is used on tests validating for leaks. + */ + public static void resetMaxAIO() + { + totalMaxIO.set(0); + } + + /** + * the native ioContext including the structure created. + */ + private final ByteBuffer ioContext; + + private final AtomicBoolean closed = new AtomicBoolean(false); + + final Semaphore ioSpace; + + final int queueSize; + + /** + * The queue size here will use resources defined on the kernel parameter + * fs.aio-max-nr . + * + * @param queueSize the size to be initialize on libaio + * io_queue_init which can't be higher than /proc/sys/fs/aio-max-nr. + * @param useSemaphore should block on a semaphore avoiding using more submits than what's available. + */ + public LibaioContext(int queueSize, boolean useSemaphore) + { + try + { + this.ioContext = newContext(queueSize); + } + catch (Exception e) + { + throw e; + } + this.queueSize = queueSize; + totalMaxIO.addAndGet(queueSize); + if (useSemaphore) + { + this.ioSpace = new Semaphore(queueSize); + } + else + { + this.ioSpace = null; + } + } + + + /** + * Documented at {@link LibaioFile#write(long, int, java.nio.ByteBuffer, SubmitInfo)} + * @param fd the file descriptor + * @param position the write position + * @param size number of bytes to use + * @param bufferWrite the native buffer + * @param callback a callback + * @throws IOException in case of error + */ + public void submitWrite(int fd,long position, int size, + ByteBuffer bufferWrite, Callback callback) throws IOException + { + try + { + if (ioSpace != null) + { + ioSpace.acquire(); + } + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + throw new IOException(e.getMessage(), e); + } + submitWrite(fd, this.ioContext, position, size, bufferWrite, callback); + } + + public void submitRead(int fd, long position, int size, ByteBuffer bufferWrite, + Callback callback) throws IOException + { + try + { + if (ioSpace != null) + { + ioSpace.acquire(); + } + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + throw new IOException(e.getMessage(), e); + } + submitRead(fd, this.ioContext, position, size, bufferWrite, callback); + } + + + /** + * This is used to close the libaio queues and cleanup the native data used. + *
+ * It is unsafe to close the controller while you have pending writes or files open as + * this could cause core dumps or VM crashes. + */ + @Override + public void close() + { + if (!closed.getAndSet(true)) + { + totalMaxIO.addAndGet(-queueSize); + + if (ioContext != null) + { + deleteContext(ioContext); + } + } + } + + @Override + protected void finalize() throws Throwable + { + super.finalize(); + close(); + } + + /** + * It will open a file. If you set the direct flag = false then you won't need to use the special buffer. + * Notice: This will create an empty file if the file doesn't already exist. + * + * @param file the file to be open. + * @param direct will set ODIRECT. + * @return It will return a LibaioFile instance. + * @throws IOException in case of error. + */ + public LibaioFile openFile(File file, boolean direct) throws IOException + { + return openFile(file.getPath(), direct); + } + + /** + * It will open a file. If you set the direct flag = false then you won't need to use the special buffer. + * Notice: This will create an empty file if the file doesn't already exist. + * + * @param file the file to be open. + * @param direct should use O_DIRECT when opening the file. + * @return a new open file. + * @throws IOException in case of error. + */ + public LibaioFile openFile(String file, boolean direct) throws IOException + { + checkNotNull(file, "path"); + checkNotNull(ioContext, "IOContext"); + + // note: the native layer will throw an IOException in case of errors + int res = LibaioContext.open(file, direct); + + return new LibaioFile<>(res, this); + } + + /** + * It will open a file disassociated with any sort of factory. + * This is useful when you won't use reading / writing through libaio like locking files. + * @param file a file name + * @param direct will use O_DIRECT + * @return a new file + * @throws IOException in case of error. + */ + public static LibaioFile openControlFile(String file, boolean direct) throws IOException + { + checkNotNull(file, "path"); + + // note: the native layer will throw an IOException in case of errors + int res = LibaioContext.open(file, direct); + + return new LibaioFile<>(res, null); + } + + /** + * It will poll the libaio queue for results. It should block until min is reached + * Results are placed on the callback. + *
+ * This shouldn't be called concurrently. You should provide your own synchronization if you need more than one + * Thread polling for any reason. + *
+ * Notice that the native layer will invoke {@link SubmitInfo#onError(int, String)} in case of failures, + * but it won't call done method for you. + * + * @param callbacks area to receive the callbacks passed on submission.The size of this callback has to + * be greater than the parameter max. + * @param min the minimum number of elements to receive. It will block until this is achieved. + * @param max The maximum number of elements to receive. + * @return Number of callbacks returned. + * @see LibaioFile#write(long, int, java.nio.ByteBuffer, SubmitInfo) + * @see LibaioFile#read(long, int, java.nio.ByteBuffer, SubmitInfo) + */ + public int poll(Callback[] callbacks, int min, int max) + { + int released = poll(ioContext, callbacks, min, max); + if (ioSpace != null) + { + if (released > 0) + { + ioSpace.release(released); + } + } + return released; + } + + /** + * It will start polling and will keep doing until the context is closed. + * This will call callbacks on {@link SubmitInfo#onError(int, String)} and + * {@link SubmitInfo#done()}. + * In case of error, both {@link SubmitInfo#onError(int, String)} and + * {@link SubmitInfo#done()} are called. + */ + public void poll() + { + blockedPoll(ioContext); + } + + /** Called from the native layer */ + private void done(SubmitInfo info) + { + if (ioSpace != null) + { + ioSpace.release(); + } + info.done(); + } + + /** + * This is the queue for libaio, initialized with queueSize. + */ + private native ByteBuffer newContext(int queueSize); + + /** + * Internal method to be used when closing the controller. + */ + private native void deleteContext(ByteBuffer buffer); + + /** + * it will return a file descriptor. + * + * @param path the file name. + * @param direct translates as O_DIRECT On open + * @return a fd from open C call. + */ + public static native int open(String path, boolean direct); + + static native void close(int fd); + + /** + */ + + /** + * Buffers for O_DIRECT need to use posix_memalign. + *
+ * Documented at {@link LibaioFile#newBuffer(int)}. + * + * @param size needs to be % alignment + * @param alignment the alignment used at the dispositive + * @return a new native buffer used with posix_memalign + */ + public static native ByteBuffer newAlignedBuffer(int size, int alignment); + + /** + * This will call posix free to release the inner buffer allocated at {@link #newAlignedBuffer(int, int)}. + * @param buffer a native buffer allocated with {@link #newAlignedBuffer(int, int)}. + */ + public static native void freeBuffer(ByteBuffer buffer); + + /** + * Documented at {@link LibaioFile#write(long, int, java.nio.ByteBuffer, SubmitInfo)}. + */ + native void submitWrite(int fd, + ByteBuffer libaioContext, + long position, int size, ByteBuffer bufferWrite, + Callback callback) throws IOException; + + /** + * Documented at {@link LibaioFile#read(long, int, java.nio.ByteBuffer, SubmitInfo)}. + */ + native void submitRead(int fd, + ByteBuffer libaioContext, + long position, int size, ByteBuffer bufferWrite, + Callback callback) throws IOException; + + /** + * Note: this shouldn't be done concurrently. + * This method will block until the min condition is satisfied on the poll. + *

+ * The callbacks will include the original callback sent at submit (read or write). + */ + native int poll(ByteBuffer libaioContext, Callback[] callbacks, int min, int max); + + /** + * This method will block as long as the context is open. + */ + native void blockedPoll(ByteBuffer libaioContext); + + static native int getNativeVersion(); + + public static native boolean lock(int fd); + + public static native void memsetBuffer(ByteBuffer buffer, int size); + + static native long getSize(int fd); + + static native int getBlockSizeFD(int fd); + + public static int getBlockSize(File path) + { + return getBlockSize(path.getAbsolutePath()); + } + + public static native int getBlockSize(String path); + + static native void fallocate(int fd, long size); + + static native void fill(int fd, long size); + + static native void writeInternal(int fd, long position, long size, ByteBuffer bufferWrite) throws IOException; +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioFile.java ---------------------------------------------------------------------- diff --git a/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioFile.java b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioFile.java new file mode 100644 index 0000000..bf88d65 --- /dev/null +++ b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioFile.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.jlibaio; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * This is an extension to use libaio. + */ +public final class LibaioFile +{ + protected boolean open; + /** + * This represents a structure allocated on the native + * this is a io_context_t + */ + final LibaioContext ctx; + + private int fd; + + LibaioFile(int fd, LibaioContext ctx) + { + this.ctx = ctx; + this.fd = fd; + } + + public int getBlockSize() + { + return 512; + // FIXME + //return LibaioContext.getBlockSizeFD(fd); + } + + public boolean lock() + { + return LibaioContext.lock(fd); + } + + /** + * {@inheritDoc} + */ + public void close() throws IOException + { + open = false; + LibaioContext.close(fd); + } + + /** + * @return The size of the file. + */ + public long getSize() + { + return LibaioContext.getSize(fd); + } + + /** + * It will submit a write to the queue. The callback sent here will be received on the + * {@link LibaioContext#poll(SubmitInfo[], int, int)} + * In case of the libaio queue is full (e.g. returning E_AGAIN) this method will return false. + *
+ * Notice: this won't hold a global reference on buffer, callback should hold a reference towards bufferWrite. + * And don't free the buffer until the callback was called as this could crash the VM. + * + * @param position The position on the file to write. Notice this has to be a multiple of 512. + * @param size The size of the buffer to use while writing. + * @param buffer if you are using O_DIRECT the buffer here needs to be allocated by {@link #newBuffer(int)}. + * @param callback A callback to be returned on the poll method. + * @throws java.io.IOException in case of error + */ + public void write(long position, int size, ByteBuffer buffer, Callback callback) throws IOException + { + ctx.submitWrite(fd, position, size, buffer, callback); + } + + /** + * It will submit a read to the queue. The callback sent here will be received on the + * {@link LibaioContext#poll(SubmitInfo[], int, int)}. + * In case of the libaio queue is full (e.g. returning E_AGAIN) this method will return false. + *
+ * Notice: this won't hold a global reference on buffer, callback should hold a reference towards bufferWrite. + * And don't free the buffer until the callback was called as this could crash the VM. + * * + * + * @param position The position on the file to read. Notice this has to be a multiple of 512. + * @param size The size of the buffer to use while reading. + * @param buffer if you are using O_DIRECT the buffer here needs to be allocated by {@link #newBuffer(int)}. + * @param callback A callback to be returned on the poll method. + * @throws java.io.IOException in case of error + * @see LibaioContext#poll(SubmitInfo[], int, int) + */ + public void read(long position, int size, ByteBuffer buffer, Callback callback) throws IOException + { + ctx.submitRead(fd, position, size, buffer, callback); + } + + /** + * It will allocate a buffer to be used on libaio operations. + * Buffers here are allocated with posix_memalign. + *
+ * You need to explicitly free the buffer created from here using the + * {@link LibaioContext#freeBuffer(java.nio.ByteBuffer)}. + * + * @param size the size of the buffer. + * @return the buffer allocated. + */ + public ByteBuffer newBuffer(int size) + { + return LibaioContext.newAlignedBuffer(size, 512); + } + + /** + * It will preallocate the file with a given size. + * @param size number of bytes to be filled on the file + */ + public void fill(long size) + { + try + { + LibaioContext.fill(fd, size); + } + catch (OutOfMemoryError e) + { + NativeLogger.LOGGER.debug("Didn't have enough memory to allocate " + size + " bytes in memory, using simple fallocate"); + LibaioContext.fallocate(fd, size); + } + } + + /** + * It will use fallocate to initialize a file. + * @param size number of bytes to be filled on the file + */ + public void fallocate(long size) + { + LibaioContext.fallocate(fd, size); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/NativeLogger.java ---------------------------------------------------------------------- diff --git a/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/NativeLogger.java b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/NativeLogger.java new file mode 100644 index 0000000..449c168 --- /dev/null +++ b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/NativeLogger.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.jlibaio; + +import org.jboss.logging.BasicLogger; +import org.jboss.logging.Logger; +import org.jboss.logging.annotations.LogMessage; +import org.jboss.logging.annotations.Message; +import org.jboss.logging.annotations.MessageLogger; + +/** + * Logger Code 14 + * + * each message id must be 6 digits long starting with 14, the 3rd digit donates the level so + * + * INF0 1 + * WARN 2 + * DEBUG 3 + * ERROR 4 + * TRACE 5 + * FATAL 6 + * + * so an INFO message would be 1000 to 6000 + */ +@MessageLogger(projectCode = "jlibaio") +public interface NativeLogger extends BasicLogger +{ + /** + * The journal logger. + */ + NativeLogger LOGGER = Logger.getMessageLogger(NativeLogger.class, NativeLogger.class.getPackage().getName()); + + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 1001, value = "You have a native library with a different version than expected", format = Message.Format.MESSAGE_FORMAT) + void incompatibleNativeLibrary(); +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/SubmitInfo.java ---------------------------------------------------------------------- diff --git a/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/SubmitInfo.java b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/SubmitInfo.java new file mode 100644 index 0000000..47feea8 --- /dev/null +++ b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/SubmitInfo.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.jlibaio; + +public interface SubmitInfo +{ + void onError(int errno, String message); + + void done(); +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/package-info.java ---------------------------------------------------------------------- diff --git a/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/package-info.java b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/package-info.java new file mode 100644 index 0000000..341fa1c --- /dev/null +++ b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/package-info.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This packages handles Linux libaio at a low level. + *
+ * Buffers needs to be specially allocated by {@link org.apache.activemq.artemis.jlibaio.LibaioContext#newAlignedBuffer(int, int)} + * as they need to be aligned to 512 or 4096 when using Direct files. + */ +package org.apache.activemq.artemis.jlibaio; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/util/CallbackCache.java ---------------------------------------------------------------------- diff --git a/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/util/CallbackCache.java b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/util/CallbackCache.java new file mode 100644 index 0000000..ec2a630 --- /dev/null +++ b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/util/CallbackCache.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.jlibaio.util; + + +import org.apache.activemq.artemis.jlibaio.SubmitInfo; + +/** + * this is an utility class where you can reuse Callbackk objects for your LibaioContext usage. + */ +public class CallbackCache +{ + private final SubmitInfo[] pool; + + private int put = 0; + private int get = 0; + private int available = 0; + private final int size; + + private final Object lock = new Object(); + + public CallbackCache(int size) + { + this.pool = new SubmitInfo[size]; + this.size = size; + } + + public Callback get() + { + synchronized (lock) + { + if (available <= 0) + { + return null; + } + else + { + Callback retValue = (Callback)pool[get]; + pool[get] = null; + if (retValue == null) + { + throw new NullPointerException("You should initialize the pool before using it"); + } + if (retValue != null) + { + available--; + get++; + if (get >= size) + { + get = 0; + } + } + return retValue; + } + } + } + + public CallbackCache put(Callback callback) + { + if (callback == null) + { + return null; + } + synchronized (lock) + { + if (available < size) + { + available++; + pool[put++] = callback; + if (put >= size) + { + put = 0; + } + } + } + return this; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/CallbackCachelTest.java ---------------------------------------------------------------------- diff --git a/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/CallbackCachelTest.java b/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/CallbackCachelTest.java new file mode 100644 index 0000000..f62f8e2 --- /dev/null +++ b/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/CallbackCachelTest.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.jlibaio.test; + +import java.util.HashSet; + +import org.apache.activemq.artemis.jlibaio.util.CallbackCache; +import org.apache.activemq.artemis.jlibaio.SubmitInfo; +import org.junit.Assert; +import org.junit.Test; + +public class CallbackCachelTest +{ + @Test + public void testPartiallyInitialized() + { + CallbackCache pool = new CallbackCache(100); + + + for (int i = 0; i < 50; i++) + { + pool.put(new MyPool(i)); + } + + MyPool value = pool.get(); + + Assert.assertNotNull(value); + + pool.put(value); + + + // add and remove immediately + for (int i = 0; i < 777; i++) + { + pool.put(pool.get()); + } + + + HashSet hashValues = new HashSet<>(); + + + MyPool getValue; + while ((getValue = pool.get()) != null) + { + hashValues.add(getValue); + } + + + Assert.assertEquals(50, hashValues.size()); + } + + static class MyPool implements SubmitInfo + { + public final int i; + + MyPool(int i) + { + this.i = i; + } + + + public int getI() + { + return i; + } + + @Override + public void onError(int errno, String message) + { + } + + @Override + public void done() + { + + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + MyPool myPool = (MyPool) o; + + if (i != myPool.i) return false; + + return true; + } + + @Override + public int hashCode() + { + return i; + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java ---------------------------------------------------------------------- diff --git a/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java b/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java new file mode 100644 index 0000000..e46caf3 --- /dev/null +++ b/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java @@ -0,0 +1,859 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.jlibaio.test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.lang.ref.WeakReference; +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.jlibaio.LibaioContext; +import org.apache.activemq.artemis.jlibaio.LibaioFile; +import org.apache.activemq.artemis.jlibaio.SubmitInfo; +import org.junit.After; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** + * This test is using a different package from {@link LibaioFile} + * as I need to validate public methods on the API + */ +public class LibaioTest +{ + + @BeforeClass + public static void testAIO() + { + Assume.assumeTrue(LibaioContext.isLoaded()); + } + + /** This is just an arbitrary number for a number of elements you need to pass to the libaio init method + * Some of the tests are using half of this number, so if anyone decide to change this please use an even number. + */ + private static final int LIBAIO_QUEUE_SIZE = 50; + + @Rule + public TemporaryFolder temporaryFolder; + + public LibaioContext control; + + @Before + public void setUpFactory() + { + control = new LibaioContext<>(LIBAIO_QUEUE_SIZE, true); + } + + @After + public void deleteFactory() + { + control.close(); + validateLibaio(); + } + + public void validateLibaio() + { + Assert.assertEquals(0, LibaioContext.getTotalMaxIO()); + } + + public LibaioTest() + { + /* + * I didn't use /tmp for three reasons + * - Most systems now will use tmpfs which is not compatible with O_DIRECT + * - This would fill up /tmp in case of failures. + * - target is cleaned up every time you do a mvn clean, so it's safer + */ + File parent = new File("./target"); + parent.mkdirs(); + temporaryFolder = new TemporaryFolder(parent); + } + + @Test + public void testOpen() throws Exception + { + LibaioFile fileDescriptor = control.openFile(temporaryFolder.newFile("test.bin"), true); + fileDescriptor.close(); + } + + @Test + public void testInitAndFallocate() throws Exception + { + LibaioFile fileDescriptor = control.openFile(temporaryFolder.newFile("test.bin"), true); + fileDescriptor.fallocate(1024 * 1024); + + ByteBuffer buffer = fileDescriptor.newBuffer(1024 * 1024); + fileDescriptor.read(0, 1024 * 1024, buffer, new TestInfo()); + + TestInfo[] callbacks = new TestInfo[1]; + control.poll(callbacks, 1, 1); + + fileDescriptor.close(); + + + buffer.position(0); + + LibaioFile fileDescriptor2 = control.openFile(temporaryFolder.newFile("test2.bin"), true); + fileDescriptor2.fill(1024 * 1024); + fileDescriptor2.read(0, 1024 * 1024, buffer, new TestInfo()); + + control.poll(callbacks, 1, 1); + for (int i = 0; i < 1024 * 1024; i++) + { + Assert.assertEquals(0, buffer.get()); + } + + LibaioContext.freeBuffer(buffer); + } + + @Test + public void testSubmitWriteOnTwoFiles() throws Exception + { + + File file1 = temporaryFolder.newFile("test.bin"); + File file2 = temporaryFolder.newFile("test2.bin"); + + fillupFile(file1, LIBAIO_QUEUE_SIZE / 2); + fillupFile(file2, LIBAIO_QUEUE_SIZE / 2); + + LibaioFile[] fileDescriptor = new LibaioFile[]{control.openFile(file1, true), + control.openFile(file2, true)}; + + Assert.assertEquals((LIBAIO_QUEUE_SIZE / 2) * 512, fileDescriptor[0].getSize()); + Assert.assertEquals((LIBAIO_QUEUE_SIZE / 2) * 512, fileDescriptor[1].getSize()); + Assert.assertEquals(fileDescriptor[0].getBlockSize(), fileDescriptor[1].getBlockSize()); + Assert.assertEquals(LibaioContext.getBlockSize(temporaryFolder.getRoot()), LibaioContext.getBlockSize(file1)); + Assert.assertEquals(LibaioContext.getBlockSize(file1), LibaioContext.getBlockSize(file2)); + System.out.println("blockSize = " + fileDescriptor[0].getBlockSize()); + System.out.println("blockSize /tmp= " + LibaioContext.getBlockSize("/tmp")); + + ByteBuffer buffer = LibaioContext.newAlignedBuffer(512, 512); + + try + { + for (int i = 0; i < 512; i++) + { + buffer.put((byte) 'a'); + } + + TestInfo callback = new TestInfo(); + TestInfo[] callbacks = new TestInfo[LIBAIO_QUEUE_SIZE]; + + for (int i = 0; i < LIBAIO_QUEUE_SIZE / 2; i++) + { + for (LibaioFile file : fileDescriptor) + { + file.write(i * 512, 512, buffer, callback); + } + } + + Assert.assertEquals(LIBAIO_QUEUE_SIZE, control.poll(callbacks, LIBAIO_QUEUE_SIZE, LIBAIO_QUEUE_SIZE)); + + for (Object returnedCallback : callbacks) + { + Assert.assertSame(returnedCallback, callback); + } + + for (LibaioFile file : fileDescriptor) + { + ByteBuffer bigbuffer = LibaioContext.newAlignedBuffer(512 * 25, 512); + file.read(0, 512 * 25, bigbuffer, callback); + Assert.assertEquals(1, control.poll(callbacks, 1, LIBAIO_QUEUE_SIZE)); + + for (Object returnedCallback : callbacks) + { + Assert.assertSame(returnedCallback, callback); + } + + for (int i = 0; i < 512 * 25; i++) + { + Assert.assertEquals((byte) 'a', bigbuffer.get()); + } + + LibaioContext.freeBuffer(bigbuffer); + + file.close(); + } + } + finally + { + LibaioContext.freeBuffer(buffer); + } + } + + @Test + public void testSubmitWriteAndRead() throws Exception + { + TestInfo callback = new TestInfo(); + + TestInfo[] callbacks = new TestInfo[LIBAIO_QUEUE_SIZE]; + + LibaioFile fileDescriptor = control.openFile(temporaryFolder.newFile("test.bin"), true); + + // ByteBuffer buffer = ByteBuffer.allocateDirect(512); + ByteBuffer buffer = LibaioContext.newAlignedBuffer(512, 512); + + try + { + for (int i = 0; i < 512; i++) + { + buffer.put((byte) 'a'); + } + + buffer.rewind(); + + fileDescriptor.write(0, 512, buffer, callback); + + int retValue = control.poll(callbacks, 1, LIBAIO_QUEUE_SIZE); + Assert.assertEquals(1, retValue); + + Assert.assertSame(callback, callbacks[0]); + + LibaioContext.freeBuffer(buffer); + + buffer = LibaioContext.newAlignedBuffer(512, 512); + + for (int i = 0; i < 512; i++) + { + buffer.put((byte) 'B'); + } + + fileDescriptor.write(0, 512, buffer, null); + + Assert.assertEquals(1, control.poll(callbacks, 1, LIBAIO_QUEUE_SIZE)); + + buffer.rewind(); + + fileDescriptor.read(0, 512, buffer, null); + + Assert.assertEquals(1, control.poll(callbacks, 1, LIBAIO_QUEUE_SIZE)); + + for (int i = 0; i < 512; i++) + { + Assert.assertEquals('B', buffer.get()); + } + } + finally + { + LibaioContext.freeBuffer(buffer); + fileDescriptor.close(); + } + } + + @Test + /** + * This file is making use of libaio without O_DIRECT + * We won't need special buffers on this case. + */ + public void testSubmitWriteAndReadRegularBuffers() throws Exception + { + TestInfo callback = new TestInfo(); + + TestInfo[] callbacks = new TestInfo[LIBAIO_QUEUE_SIZE]; + + File file = temporaryFolder.newFile("test.bin"); + + fillupFile(file, LIBAIO_QUEUE_SIZE); + + LibaioFile fileDescriptor = control.openFile(file, false); + + final int BUFFER_SIZE = 50; + + ByteBuffer buffer = ByteBuffer.allocateDirect(BUFFER_SIZE); + + try + { + for (int i = 0; i < BUFFER_SIZE; i++) + { + buffer.put((byte) 'a'); + } + + buffer.rewind(); + + fileDescriptor.write(0, BUFFER_SIZE, buffer, callback); + + int retValue = control.poll(callbacks, 1, LIBAIO_QUEUE_SIZE); + System.out.println("Return from poll::" + retValue); + Assert.assertEquals(1, retValue); + + Assert.assertSame(callback, callbacks[0]); + + buffer.rewind(); + + for (int i = 0; i < BUFFER_SIZE; i++) + { + buffer.put((byte) 'B'); + } + + fileDescriptor.write(0, BUFFER_SIZE, buffer, null); + + Assert.assertEquals(1, control.poll(callbacks, 1, LIBAIO_QUEUE_SIZE)); + + buffer.rewind(); + + fileDescriptor.read(0, 50, buffer, null); + + Assert.assertEquals(1, control.poll(callbacks, 1, LIBAIO_QUEUE_SIZE)); + + for (int i = 0; i < BUFFER_SIZE; i++) + { + Assert.assertEquals('B', buffer.get()); + } + } + finally + { + fileDescriptor.close(); + } + } + + @Test + public void testSubmitRead() throws Exception + { + + TestInfo callback = new TestInfo(); + + TestInfo[] callbacks = new TestInfo[LIBAIO_QUEUE_SIZE]; + + File file = temporaryFolder.newFile("test.bin"); + + fillupFile(file, LIBAIO_QUEUE_SIZE); + + LibaioFile fileDescriptor = control.openFile(file, true); + + ByteBuffer buffer = LibaioContext.newAlignedBuffer(512, 512); + + final int BUFFER_SIZE = 512; + try + { + for (int i = 0; i < BUFFER_SIZE; i++) + { + buffer.put((byte) '@'); + } + + fileDescriptor.write(0, BUFFER_SIZE, buffer, callback); + Assert.assertEquals(1, control.poll(callbacks, 1, LIBAIO_QUEUE_SIZE)); + Assert.assertSame(callback, callbacks[0]); + + buffer.rewind(); + + fileDescriptor.read(0, BUFFER_SIZE, buffer, callback); + + Assert.assertEquals(1, control.poll(callbacks, 1, LIBAIO_QUEUE_SIZE)); + + Assert.assertSame(callback, callbacks[0]); + + for (int i = 0; i < BUFFER_SIZE; i++) + { + Assert.assertEquals('@', buffer.get()); + } + } + finally + { + LibaioContext.freeBuffer(buffer); + fileDescriptor.close(); + } + } + + @Test + public void testInvalidWrite() throws Exception + { + + TestInfo callback = new TestInfo(); + + TestInfo[] callbacks = new TestInfo[LIBAIO_QUEUE_SIZE]; + + File file = temporaryFolder.newFile("test.bin"); + + fillupFile(file, LIBAIO_QUEUE_SIZE); + + LibaioFile fileDescriptor = control.openFile(file, true); + + try + { + ByteBuffer buffer = ByteBuffer.allocateDirect(300); + for (int i = 0; i < 300; i++) + { + buffer.put((byte) 'z'); + } + + fileDescriptor.write(0, 300, buffer, callback); + + Assert.assertEquals(1, control.poll(callbacks, 1, LIBAIO_QUEUE_SIZE)); + + Assert.assertTrue(callbacks[0].isError()); + + // Error condition + Assert.assertSame(callbacks[0], callback); + + System.out.println("Error:" + callbacks[0]); + + buffer = fileDescriptor.newBuffer(512); + for (int i = 0; i < 512; i++) + { + buffer.put((byte) 'z'); + } + + callback = new TestInfo(); + + fileDescriptor.write(0, 512, buffer, callback); + + Assert.assertEquals(1, control.poll(callbacks, 1, 1)); + + Assert.assertSame(callback, callbacks[0]); + + fileDescriptor.write(5, 512, buffer, callback); + + Assert.assertEquals(1, control.poll(callbacks, 1, 1)); + + Assert.assertTrue(callbacks[0].isError()); + + callbacks = null; + callback = null; + + TestInfo.checkLeaks(); + } + finally + { + fileDescriptor.close(); + } + } + + @Test + public void testLeaks() throws Exception + { + File file = temporaryFolder.newFile("test.bin"); + + fillupFile(file, LIBAIO_QUEUE_SIZE * 2); + + TestInfo[] callbacks = new TestInfo[LIBAIO_QUEUE_SIZE]; + + LibaioFile fileDescriptor = control.openFile(file, true); + + ByteBuffer bufferWrite = LibaioContext.newAlignedBuffer(512, 512); + + try + { + for (int i = 0; i < 512; i++) + { + bufferWrite.put((byte) 'B'); + } + + for (int j = 0; j < LIBAIO_QUEUE_SIZE * 2; j++) + { + for (int i = 0; i < LIBAIO_QUEUE_SIZE; i++) + { + TestInfo countClass = new TestInfo(); + fileDescriptor.write(i * 512, 512, bufferWrite, countClass); + } + + Assert.assertEquals(LIBAIO_QUEUE_SIZE, control.poll(callbacks, LIBAIO_QUEUE_SIZE, LIBAIO_QUEUE_SIZE)); + + for (int i = 0; i < LIBAIO_QUEUE_SIZE; i++) + { + Assert.assertNotNull(callbacks[i]); + callbacks[i] = null; + } + } + + TestInfo.checkLeaks(); + } + finally + { + LibaioContext.freeBuffer(bufferWrite); + } + } + + @Test + public void testLock() throws Exception + { + File file = temporaryFolder.newFile("test.bin"); + + LibaioFile fileDescriptor = control.openFile(file, true); + fileDescriptor.lock(); + + fileDescriptor.close(); + } + + @Test + public void testAlloc() throws Exception + { + File file = temporaryFolder.newFile("test.bin"); + + LibaioFile fileDescriptor = control.openFile(file, true); + fileDescriptor.fill(10 * 1024 * 1024); + + fileDescriptor.close(); + } + + @Test + public void testReleaseNullBuffer() throws Exception + { + boolean failed = false; + try + { + LibaioContext.freeBuffer(null); + } + catch (Exception expected) + { + failed = true; + } + + Assert.assertTrue("Exception happened!", failed); + + } + + @Test + public void testMemset() throws Exception + { + + ByteBuffer buffer = LibaioContext.newAlignedBuffer(512 * 8, 512); + + for (int i = 0; i < buffer.capacity(); i++) + { + buffer.put((byte) 'z'); + } + + buffer.position(0); + + for (int i = 0; i < buffer.capacity(); i++) + { + Assert.assertEquals((byte) 'z', buffer.get()); + } + + control.memsetBuffer(buffer); + + buffer.position(0); + + for (int i = 0; i < buffer.capacity(); i++) + { + Assert.assertEquals((byte) 0, buffer.get()); + } + + LibaioContext.freeBuffer(buffer); + + } + + @Test + public void testIOExceptionConditions() throws Exception + { + boolean exceptionThrown = false; + + control.close(); + control = new LibaioContext<>(LIBAIO_QUEUE_SIZE, false); + try + { + // There is no space for a queue this huge, the native layer should throw the exception + LibaioContext newController = new LibaioContext(Integer.MAX_VALUE, false); + } + catch (RuntimeException e) + { + exceptionThrown = true; + } + + Assert.assertTrue(exceptionThrown); + exceptionThrown = false; + + try + { + // this should throw an exception, we shouldn't be able to open a directory! + control.openFile(temporaryFolder.getRoot(), true); + } + catch (IOException expected) + { + exceptionThrown = true; + } + + Assert.assertTrue(exceptionThrown); + + exceptionThrown = false; + + LibaioFile fileDescriptor = control.openFile(temporaryFolder.newFile(), true); + fileDescriptor.close(); + try + { + fileDescriptor.close(); + } + catch (IOException expected) + { + exceptionThrown = true; + } + + Assert.assertTrue(exceptionThrown); + + fileDescriptor = control.openFile(temporaryFolder.newFile(), true); + + ByteBuffer buffer = fileDescriptor.newBuffer(512); + + try + { + for (int i = 0; i < 512; i++) + { + buffer.put((byte) 'a'); + } + + for (int i = 0; i < LIBAIO_QUEUE_SIZE; i++) + { + fileDescriptor.write(i * 512, 512, buffer, new TestInfo()); + } + + boolean ex = false; + try + { + fileDescriptor.write(0, 512, buffer, new TestInfo()); + } + catch (Exception e) + { + ex = true; + } + + Assert.assertTrue(ex); + + TestInfo[] callbacks = new TestInfo[LIBAIO_QUEUE_SIZE]; + Assert.assertEquals(LIBAIO_QUEUE_SIZE, control.poll(callbacks, LIBAIO_QUEUE_SIZE, LIBAIO_QUEUE_SIZE)); + + // it should be possible to write now after queue space being released + fileDescriptor.write(0, 512, buffer, new TestInfo()); + Assert.assertEquals(1, control.poll(callbacks, 1, 100)); + + TestInfo errorCallback = new TestInfo(); + // odd positions will have failures through O_DIRECT + fileDescriptor.read(3, 512, buffer, errorCallback); + Assert.assertEquals(1, control.poll(callbacks, 1, 50)); + Assert.assertTrue(callbacks[0].isError()); + Assert.assertSame(errorCallback, (callbacks[0])); + + // to help GC and the checkLeaks + callbacks = null; + errorCallback = null; + + TestInfo.checkLeaks(); + + exceptionThrown = false; + try + { + LibaioContext.newAlignedBuffer(300, 512); + } + catch (RuntimeException e) + { + exceptionThrown = true; + } + + Assert.assertTrue(exceptionThrown); + + exceptionThrown = false; + try + { + LibaioContext.newAlignedBuffer(-512, 512); + } + catch (RuntimeException e) + { + exceptionThrown = true; + } + + Assert.assertTrue(exceptionThrown); + } + finally + { + LibaioContext.freeBuffer(buffer); + } + } + + @Test + public void testBlockedCallback() throws Exception + { + final LibaioContext blockedContext = new LibaioContext(500, true); + Thread t = new Thread() + { + public void run() + { + blockedContext.poll(); + } + }; + + t.start(); + + + int NUMBER_OF_BLOCKS = 5000; + + final CountDownLatch latch = new CountDownLatch(NUMBER_OF_BLOCKS); + + File file = temporaryFolder.newFile("sub-file.txt"); + LibaioFile aioFile = blockedContext.openFile(file, true); + aioFile.fill(NUMBER_OF_BLOCKS * 512); + + final AtomicInteger errors = new AtomicInteger(0); + + class MyCallback implements SubmitInfo + { + @Override + public void onError(int errno, String message) + { + errors.incrementAndGet(); + } + + @Override + public void done() + { + latch.countDown(); + } + } + + MyCallback callback = new MyCallback(); + + ByteBuffer buffer = LibaioContext.newAlignedBuffer(512, 512); + + + for (int i = 0; i < 512; i++) + { + buffer.put((byte)'a'); + } + + long start = System.currentTimeMillis(); + + for (int i = 0; i < NUMBER_OF_BLOCKS; i++) + { + aioFile.write(i * 512, 512, buffer, callback); + } + + long end = System.currentTimeMillis(); + + latch.await(); + + + System.out.println("time = " + (end - start) + " writes/second=" + NUMBER_OF_BLOCKS * 1000L / (end - start)); +// +// MultiThreadAsynchronousFileTest.debug((sync ? "Sync result:" : "Async result:") + " Records/Second = " + +// MultiThreadAsynchronousFileTest.NUMBER_OF_THREADS * +// MultiThreadAsynchronousFileTest.NUMBER_OF_LINES * +// 1000 / +// (endTime - startTime) + +// " total time = " + +// (endTime - startTime) + +// " total number of records = " + +// MultiThreadAsynchronousFileTest.NUMBER_OF_THREADS * +// MultiThreadAsynchronousFileTest.NUMBER_OF_LINES); + + Thread.sleep(100); + + blockedContext.close(); + t.join(); + } + + private void fillupFile(File file, int blocks) throws IOException + { + FileOutputStream fileOutputStream = new FileOutputStream(file); + byte[] bufferWrite = new byte[512]; + for (int i = 0; i < 512; i++) + { + bufferWrite[i] = (byte) 0; + } + + for (int i = 0; i < blocks; i++) + { + fileOutputStream.write(bufferWrite); + } + + fileOutputStream.close(); + } + + + static class TestInfo implements SubmitInfo + { + static AtomicInteger count = new AtomicInteger(); + + @Override + protected void finalize() throws Throwable + { + super.finalize(); + count.decrementAndGet(); + } + + public static void checkLeaks() throws InterruptedException + { + for (int i = 0; count.get() != 0 && i < 50; i++) + { + WeakReference reference = new WeakReference(new Object()); + while (reference.get() != null) + { + System.gc(); + Thread.sleep(100); + } + } + Assert.assertEquals(0, count.get()); + } + + boolean error = false; + String errorMessage; + int errno; + + public TestInfo() + { + count.incrementAndGet(); + } + + @Override + public void onError(int errno, String message) + { + this.errno = errno; + this.errorMessage = message; + this.error = true; + } + + @Override + public void done() + { + } + + public int getErrno() + { + return errno; + } + + public void setErrno(int errno) + { + this.errno = errno; + } + + public boolean isError() + { + return error; + } + + public void setError(boolean error) + { + this.error = error; + } + + public String getErrorMessage() + { + return errorMessage; + } + + public void setErrorMessage(String errorMessage) + { + this.errorMessage = errorMessage; + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java index 71610b4..f74d6d7 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java @@ -20,6 +20,7 @@ package org.apache.activemq.artemis.core.protocol.proton.plug; import java.util.concurrent.Executor; import io.netty.buffer.ByteBuf; +import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.transport.AmqpError; import org.apache.qpid.proton.amqp.transport.ErrorCondition; @@ -30,7 +31,6 @@ import org.apache.qpid.proton.jms.EncodedMessage; import org.apache.qpid.proton.message.ProtonJMessage; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; -import org.apache.activemq.artemis.core.journal.IOAsyncTask; import org.apache.activemq.artemis.core.protocol.proton.ProtonProtocolManager; import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.ServerConsumer; @@ -275,7 +275,7 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se serverSession.send(message, false); - manager.getServer().getStorageManager().afterCompleteOperations(new IOAsyncTask() + manager.getServer().getStorageManager().afterCompleteOperations(new IOCallback() { @Override public void done() http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java index aa3f9e0..45260d5 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java @@ -22,7 +22,7 @@ import io.netty.buffer.EmptyByteBuf; import io.netty.handler.codec.mqtt.MqttMessageType; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.journal.IOAsyncTask; +import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; @@ -183,7 +183,7 @@ public class MQTTPublishManager private void createMessageAck(final int messageId, final int qos) { - session.getServer().getStorageManager().afterCompleteOperations(new IOAsyncTask() + session.getServer().getStorageManager().afterCompleteOperations(new IOCallback() { @Override public void done()