directory-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mhe...@apache.org
Subject svn commit: r478948 - in /directory/sandbox/mheath/aio/trunk/src: main/c/org_apache_aio.cpp test/java/AIOTest.java
Date Fri, 24 Nov 2006 19:14:33 GMT
Author: mheath
Date: Fri Nov 24 11:14:32 2006
New Revision: 478948

URL: http://svn.apache.org/viewvc?view=rev&rev=478948
Log:
Added asynch write support.

Modified:
    directory/sandbox/mheath/aio/trunk/src/main/c/org_apache_aio.cpp
    directory/sandbox/mheath/aio/trunk/src/test/java/AIOTest.java

Modified: directory/sandbox/mheath/aio/trunk/src/main/c/org_apache_aio.cpp
URL: http://svn.apache.org/viewvc/directory/sandbox/mheath/aio/trunk/src/main/c/org_apache_aio.cpp?view=diff&rev=478948&r1=478947&r2=478948
==============================================================================
--- directory/sandbox/mheath/aio/trunk/src/main/c/org_apache_aio.cpp (original)
+++ directory/sandbox/mheath/aio/trunk/src/main/c/org_apache_aio.cpp Fri Nov 24 11:14:32 2006
@@ -31,7 +31,8 @@
 {
 	JNIEXPORT jint JNICALL JNI_OnLoad(JavaVM *vm, void *reserved);
 	JNIEXPORT void JNICALL JNI_OnUnload(JavaVM *vm, void *reserved);
-	void aio_completion_handler(sigval_t sigval);
+	void aio_read_completion_handler(sigval_t sigval);
+	void aio_write_completion_handler(sigval_t sigval);
 }
 
 // --- Utility Functions ----------------------------------------------------
@@ -48,6 +49,13 @@
 	return env->CallIntMethod(buffer, MID_position); 
 }
 
+inline void setBufferPosition(JNIEnv *env, jobject buffer, jint position)
+{
+	jclass cls = env->GetObjectClass(buffer);
+	jmethodID MID_position = env->GetMethodID(cls, "position", "(I)Ljava/nio/Buffer;");
+	env->CallVoidMethod(buffer, MID_position, position);
+}
+
 inline jint getBufferLimit(JNIEnv *env, jobject buffer)
 {
 	jclass cls = env->GetObjectClass(buffer);
@@ -74,13 +82,58 @@
 	return env->NewGlobalRef(future);
 }
 
+struct aiocb *setupAioRequest(JNIEnv *env, jobject asynchronousFileChannel, jobject buffer,
jlong position, jobject operation)
+{
+		// Get address and capacity of buffer
+	if (buffer == NULL)
+	{
+		env->ThrowNew(nullPointerException, "buffer cannot be null");
+	}
+	// TODO: Add support for non direct byte buffers
+	jbyte *bufferAddress = (jbyte *)env->GetDirectBufferAddress(buffer);
+	if (bufferAddress == NULL)
+	{
+		env->ThrowNew(ioException, "Must use direct ByteBuffer");
+		return NULL;
+	}
+	// Adjust bufferAddress by the position of the buffer
+	jint bufferPosition = getBufferPosition(env, buffer); 
+	bufferAddress += bufferPosition; 
+	jint bufferSize = getBufferLimit(env, buffer) - position;
+
+	// Allocate aiocb
+	struct aiocb *req = (aiocb*)malloc(sizeof(aiocb));
+	bzero(req, sizeof(struct aiocb));
+	
+	jobject future = createPosixAioFutureReadWrite(env, asynchronousFileChannel, operation,
req, buffer, position);
+
+	// Setup aiocb
+	req->aio_fildes = getFD(env, asynchronousFileChannel);
+	req->aio_offset = position;
+	req->aio_buf = bufferAddress;
+	req->aio_nbytes = bufferSize;
+	
+	req->aio_sigevent.sigev_notify = SIGEV_THREAD;
+	if (operation == operationRead)
+	{
+		req->aio_sigevent.sigev_notify_function = aio_read_completion_handler;
+	} else if (operation == operationWrite) {
+		req->aio_sigevent.sigev_notify_function = aio_write_completion_handler;
+	}
+	req->aio_sigevent.sigev_notify_attributes = NULL;
+	req->aio_sigevent.sigev_value.sival_ptr = future;
+	
+	return req;
+}
+
 // --- OnLoad and OnUnload Functions ----------------------------------------
 JNIEXPORT jint JNICALL JNI_OnLoad(JavaVM *vm, void *reserved)
 {
 	jint JNIversion = JNI_VERSION_1_4;
 	if (DEBUG)
 	{
-		printf("Initializing native code for JNI version 0x%x\n", JNIversion);
+		fprintf(stdout, "Initializing native code for JNI version 0x%x\n", JNIversion);
+		fflush(stdout);
 	}
 	
 	// Initialize static jvm pointer.
@@ -148,8 +201,20 @@
 JNIEXPORT jobject JNICALL Java_org_apache_aio_AsynchronousFileChannel_write
   (JNIEnv *env, jobject obj, jobject buffer, jlong position)
 {
-	printf("write\n");
-	return NULL;
+	if (DEBUG)
+	{
+		fprintf(stdout, "aio write at file position %d\n", position);
+		fflush(stdout);
+	}
+	struct aiocb *req = setupAioRequest(env, obj, buffer, position, operationWrite);
+	int ret = aio_write(req);
+	if (ret)
+	{
+		// TODO Handle errors from ret
+		// return a null on error.
+		return NULL;
+	}
+	return (jobject)req->aio_sigevent.sigev_value.sival_ptr;
 }
 
 JNIEXPORT jobject JNICALL Java_org_apache_aio_AsynchronousFileChannel_read
@@ -157,55 +222,25 @@
 {
 	if (DEBUG)
 	{
-		printf("aio read at file position %d\n", position);
+		fprintf(stdout, "aio read at file position %d\n", position);
+		fflush(stdout);
 	}
-
-	// Get address and capacity of buffer
-	if (buffer == NULL)
-	{
-		env->ThrowNew(nullPointerException, "buffer cannot be null");
-	}
-	// TODO: Add support for non direct byte buffers
-	jbyte *bufferAddress = (jbyte *)env->GetDirectBufferAddress(buffer);
-	if (bufferAddress == NULL)
-	{
-		env->ThrowNew(ioException, "Must use direct ByteBuffer");
-		return NULL;
-	}
-	// Adjust bufferAddress by the position of the buffer
-	jint bufferPosition = getBufferPosition(env, buffer); 
-	bufferAddress += bufferPosition; 
-	jint bufferAvailability = getBufferLimit(env, buffer) - position;
-
-	// Allocate aiocb
-	struct aiocb *req = (aiocb*)malloc(sizeof(aiocb));
-	bzero(req, sizeof(struct aiocb));
-	
-	jobject future = createPosixAioFutureReadWrite(env, obj, operationRead, req, buffer, position);
-
-	// Setup aiocb
-	req->aio_fildes = getFD(env, obj);
-	req->aio_offset = position;
-	req->aio_buf = bufferAddress;
-	req->aio_nbytes = bufferAvailability;
-	
-	req->aio_sigevent.sigev_notify = SIGEV_THREAD;
-	req->aio_sigevent.sigev_notify_function = aio_completion_handler;
-	req->aio_sigevent.sigev_notify_attributes = NULL;
-	req->aio_sigevent.sigev_value.sival_ptr = future;
-
+	struct aiocb *req = setupAioRequest(env, obj, buffer, position, operationRead);
 	if (DEBUG)
 	{
-		printf("Do aio read\n");
+		fprintf(stdout, "Do aio read\n");
+		fflush(stdout);
 	}
 	int ret = aio_read(req);
 	if (ret)
 	{
-	// TODO Handle errors from ret
+		// TODO Handle errors from ret
+		// return a null on error.
+		return NULL;
 	}
 
 	// Return future object	
-	return future;
+	return (jobject)req->aio_sigevent.sigev_value.sival_ptr;
 }
 
 JNIEXPORT jobject JNICALL Java_org_apache_aio_AsynchronousFileChannel_batchRead
@@ -242,19 +277,20 @@
 	return 0;
 }
 
-// --- Completion handler ----------------------------------------------------------------
-void aio_completion_handler(sigval_t sigval)
+// --- Completion handlers ----------------------------------------------------------------
+void aio_read_completion_handler(sigval_t sigval)
 {
 	if (DEBUG)
 	{
-		printf("In completion handler\n");
+		fprintf(stdout, "In read completion handler\n");
+		fflush(stdout);
 	}
 
 	JNIEnv *env;
 	jint res = jvm->AttachCurrentThread((void**)&env, NULL);
 	if (res < 0)
 	{
-		fprintf(stderr, "Failed to attach JVM to AIO thread\n");
+		fprintf(stderr, "Failed to attach JVM to AIO read thread\n");
 		return;
 	}
 	
@@ -263,7 +299,7 @@
 	
 	/* Did the request complete? */
 	if (aio_error(req) == 0) {
-		/* Request completed successfully, get number of bytes read */
+		/* Request completed successfully, get number of bytes processed */
 		int ret = aio_return( req );
 		
 		// Adjust buffer limit
@@ -278,7 +314,53 @@
 		free(req);
 		env->DeleteGlobalRef(future);
 	} else {
-		fprintf(stderr, "ERROR: AIO request did NOT complete\n");
+		// TODO Find a way to handle exception here
+		fprintf(stderr, "ERROR: AIO read request did NOT complete\n");
+		fflush(stderr);
+	}
+
+	jvm->DetachCurrentThread();
+	return;
+}
+
+void aio_write_completion_handler(sigval_t sigval)
+{
+	if (DEBUG)
+	{
+		fprintf(stdout, "In write completion handler\n");
+		fflush(stdout);
+	}
+
+	JNIEnv *env;
+	jint res = jvm->AttachCurrentThread((void**)&env, NULL);
+	if (res < 0)
+	{
+		fprintf(stderr, "Failed to attach JVM to AIO write thread\n");
+		return;
+	}
+	
+	jobject future = (jobject)sigval.sival_ptr;
+	struct aiocb *req = (struct aiocb *)env->GetLongField(future, posixAioFutureReadWrite_aiocbPtrID);
+	
+	/* Did the request complete? */
+	if (aio_error(req) == 0) {
+		/* Request completed successfully, get number of bytes processed */
+		int ret = aio_return( req );
+		
+		// Adjust buffer position
+		jobject buffer = env->GetObjectField(future, posixAioFutureReadWrite_bufferID);
+		int position = getBufferPosition(env, buffer) + ret;
+		setBufferPosition(env, buffer, position);
+		
+		// Call aio listeners
+		env->CallVoidMethod(future, posixAioFutureReadWrite_processFutureListenersID);
+		
+		// Free resources		
+		free(req);
+		env->DeleteGlobalRef(future);
+	} else {
+		// TODO Find a way to handle exception here
+		fprintf(stderr, "ERROR: AIO write request did NOT complete\n");
 		fflush(stderr);
 	}
 

Modified: directory/sandbox/mheath/aio/trunk/src/test/java/AIOTest.java
URL: http://svn.apache.org/viewvc/directory/sandbox/mheath/aio/trunk/src/test/java/AIOTest.java?view=diff&rev=478948&r1=478947&r2=478948
==============================================================================
--- directory/sandbox/mheath/aio/trunk/src/test/java/AIOTest.java (original)
+++ directory/sandbox/mheath/aio/trunk/src/test/java/AIOTest.java Fri Nov 24 11:14:32 2006
@@ -1,4 +1,5 @@
 import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.nio.ByteBuffer;
 
 import org.apache.aio.AioFutureListener;
@@ -12,7 +13,6 @@
 
 		AsynchronousFileChannel achannel = new AsynchronousFileChannel(in.getFD());
 		
-		
 		AioFutureListener<AioFutureReadWrite> listener = new AioFutureListener<AioFutureReadWrite>()
{
 					public void onCompletion(AioFutureReadWrite ioFuture) {
 						System.out.println("In callback");
@@ -25,6 +25,14 @@
 		AioFutureReadWrite future = achannel.read(buffer, 0);
 		future.addListener(listener);
 		future.join();
+		
+		FileOutputStream out = new FileOutputStream("/tmp/foo");
+		achannel = new AsynchronousFileChannel(out.getFD());
+		buffer.clear();
+		buffer.put("Have a really nice day!\n".getBytes());
+		buffer.flip();
+		future = achannel.write(buffer, 0);
+		System.out.println(future);
 	}
 		
 }



Mime
View raw message