Author: chirino Date: Thu Nov 19 17:26:41 2009 New Revision: 882212 URL: http://svn.apache.org/viewvc?rev=882212&view=rev Log: Adding a higher level FileDescriptor interface for working with AIO. Added: activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/AioPollAgent.java activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/Callback.java activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/FutureCallback.java activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/FileDescriptorTest.java activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/TestSupport.java Modified: activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/FileDescriptor.java activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/jni/AIO.java activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/jni/AIOTest.java Added: activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/AioPollAgent.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/AioPollAgent.java?rev=882212&view=auto ============================================================================== --- activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/AioPollAgent.java (added) +++ activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/AioPollAgent.java Thu Nov 19 17:26:41 2009 @@ -0,0 +1,148 @@ +package org.apache.activemq.syscall; + +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.syscall.jni.AIO; + +import static java.util.concurrent.TimeUnit.*; + +import static org.apache.activemq.syscall.jni.AIO.*; +import static org.apache.activemq.syscall.jni.CLibrary.*; +import static org.apache.activemq.syscall.jni.AIO.timespec.*; + +public class AioPollAgent { + + private static final int ASYNC_THREAD_SHUTDOWN_DELAY = 1000; + public static final AioPollAgent MAIN_POLL_AGENT = new AioPollAgent(); + public static AioPollAgent getMainPollAgent() { + return MAIN_POLL_AGENT; + } + + long timeoutp; + LinkedHashMap> callbackMap = new LinkedHashMap>(); + private Thread thread; + + public void setTimeOut(long value, TimeUnit unit) { + synchronized (this) { + if( timeoutp == NULL ) { + timeoutp = malloc(AIO.timespec.SIZEOF); + if( timeoutp == NULL ) { + throw new OutOfMemoryError(); + } + } + memmove(timeoutp, timespec(value, unit), AIO.timespec.SIZEOF); + } + } + + static public timespec timespec(long value, TimeUnit unit) { + return timespec(unit.toMillis(value)); + } + + static public timespec timespec(long ms) { + timespec t = new timespec(); + t.tv_sec = ms/1000; + ms = ms%1000; + t.tv_nsec = 1000000*ms; // 50 milli seconds + return t; + } + + public void watch(long aiocbp, Callback callback) { + assert aiocbp!=NULL; + synchronized (this) { + if( callbackMap.isEmpty() && thread==null) { + if( timeoutp==NULL ) { + // Default the timeout.. + setTimeOut(10, MILLISECONDS); + } + thread = new Thread("AioPollAgent") { + public void run() { + process(); + } + }; + thread.setDaemon(false); + thread.setPriority(Thread.MAX_PRIORITY); + thread.start(); + } + callbackMap.put(aiocbp, callback); + notify(); + } + } + + private void process() { + long[] aiocbps; + while ((aiocbps = dequeueBlocks())!=null ) { + process(aiocbps); + } + } + + private long[] dequeueBlocks() { + long blocks[]; + synchronized (this) { + if( callbackMap.isEmpty() ) { + try { + wait(ASYNC_THREAD_SHUTDOWN_DELAY); + if( callbackMap.isEmpty() ) { + thread=null; + return null; + } + } catch (InterruptedException e) { + thread=null; + return null; + } + } + Set keys = callbackMap.keySet(); + blocks = new long[keys.size()]; + int i=0; + for (Long value : keys) { + blocks[i++]=value; + } + } + return blocks; + } + + private void process(long[] aiocbps) { + int rc = aio_suspend(aiocbps, aiocbps.length, timeoutp); + if (rc == 0) { + for (int i = 0; i < aiocbps.length; i++) { + rc = aio_error(aiocbps[i]); + if( rc==EINPROGRESS ) + continue; + + // The io has now completed.. free up the memory allocated for + // the aio control block, a remove it from the callback map + Callback callback = removeBlock(aiocbps[i]); + free(aiocbps[i]); + + // Let the callback know that the IO completed. + try { + if( rc==0 ) { + callback.onSuccess(aio_return(aiocbps[i])); + } else { + callback.onFailure(new IOException(string(strerror(errno())))); + } + } catch (Throwable e) { + e.printStackTrace(); + } + } + } + } + + private Callback removeBlock(long aiocbp) { + synchronized (this) { + return callbackMap.remove(aiocbp); + } + } + + protected void finalize() throws Throwable { + synchronized(this) { + if ( timeoutp!=NULL ) { + free(timeoutp); + timeoutp=NULL; + } + } + } + +} Added: activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/Callback.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/Callback.java?rev=882212&view=auto ============================================================================== --- activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/Callback.java (added) +++ activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/Callback.java Thu Nov 19 17:26:41 2009 @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.syscall; + + +/** + * @author Hiram Chirino + * @param + */ +public interface Callback { + public void onSuccess(T result); + public void onFailure(Throwable exception); +} \ No newline at end of file Modified: activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/FileDescriptor.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/FileDescriptor.java?rev=882212&r1=882211&r2=882212&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/FileDescriptor.java (original) +++ activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/FileDescriptor.java Thu Nov 19 17:26:41 2009 @@ -16,12 +16,13 @@ */ package org.apache.activemq.syscall; +import java.io.File; import java.io.IOException; -import java.util.concurrent.Callable; -import java.util.concurrent.Future; import org.apache.activemq.syscall.jni.IO; +import org.apache.activemq.syscall.jni.AIO.aiocb; +import static org.apache.activemq.syscall.jni.AIO.*; import static org.apache.activemq.syscall.jni.CLibrary.*; /** @@ -32,11 +33,16 @@ private final int fd; boolean opened; + private AioPollAgent aioPollAgent; public FileDescriptor(int fd) { this.fd = fd; } + public static FileDescriptor open(File file, int oflags, int mode) throws IOException { + return open(file.getPath(), oflags, mode); + } + public static FileDescriptor open(String path, int oflags, int mode) throws IOException { int fd = IO.open(path, oflags, mode); if( fd== -1 ) { @@ -72,8 +78,55 @@ return fd; } - public void write(NativeAllocation writeBuffer, Callable callback) { + /** + * does an async write, the callback gets executed once the write completes. + * + * @param buffer + * @param callback + */ + public void write(long offset, NativeAllocation buffer, Callback callback) throws IOException { + + aiocb cb = new aiocb(); + cb.aio_fildes = fd; + cb.aio_offset = offset; + cb.aio_buf = buffer.pointer(); + cb.aio_nbytes = buffer.length(); + + long aiocbp = malloc(aiocb.SIZEOF); + if( aiocbp==NULL ) { + throw new OutOfMemoryError(); + } + aiocb.memmove(aiocbp, cb, aiocb.SIZEOF); + aio_write(aiocbp); + + AioPollAgent agent = getAioPollAgent(); + agent.watch(aiocbp, callback); return; } + private AioPollAgent getAioPollAgent() { + if( aioPollAgent==null ) { + aioPollAgent = AioPollAgent.getMainPollAgent(); + } + return aioPollAgent; + } + + public void setAioPollAgent(AioPollAgent aioPollAgent) { + this.aioPollAgent = aioPollAgent; + } + + /** + * does an async read, the callback gets executed once the read completes. + * + * @param buffer + * @param callback + */ + public void read(long offset, NativeAllocation buffer, Callback callback) throws IOException { + + + return; + } + + + } Added: activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/FutureCallback.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/FutureCallback.java?rev=882212&view=auto ============================================================================== --- activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/FutureCallback.java (added) +++ activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/FutureCallback.java Thu Nov 19 17:26:41 2009 @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.syscall; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * A Callback implementation which provides a Future interface. + * + * @author Hiram Chirino + * + * @param + */ +public class FutureCallback implements Callback, Future { + + private CountDownLatch done = new CountDownLatch(1); + volatile private Throwable exception; + volatile private T result; + + public void onFailure(Throwable exception) { + this.exception = exception; + done.countDown(); + } + + public void onSuccess(T result) { + this.result = result; + done.countDown(); + } + + public T get() throws InterruptedException, ExecutionException { + done.await(); + return internalGet(); + } + + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + if ( done.await(timeout, unit) ) { + return internalGet(); + } + throw new TimeoutException(); + } + + public boolean isDone() { + return done.getCount()==0; + } + + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + /** + * This future cannot be canceled. It always throws UnsupportedOperationException. + * + * @throws UnsupportedOperationException + */ + public boolean isCancelled() { + throw new UnsupportedOperationException(); + } + + private T internalGet() throws ExecutionException { + if( exception!=null ) { + throw new ExecutionException(exception); + } + return result; + } + } \ No newline at end of file Modified: activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/jni/AIO.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/jni/AIO.java?rev=882212&r1=882211&r2=882212&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/jni/AIO.java (original) +++ activemq/sandbox/activemq-apollo/activemq-syscall/src/main/java/org/apache/activemq/syscall/jni/AIO.java Thu Nov 19 17:26:41 2009 @@ -105,9 +105,9 @@ public static int SIZEOF; @JniField(cast="time_t") - long tv_sec; + public long tv_sec; @JniField(cast="long") - long tv_nsec; + public long tv_nsec; @JniMethod(conditional="#ifdef HAVE_AIO_H") public static final native void memmove ( Added: activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/FileDescriptorTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/FileDescriptorTest.java?rev=882212&view=auto ============================================================================== --- activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/FileDescriptorTest.java (added) +++ activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/FileDescriptorTest.java Thu Nov 19 17:26:41 2009 @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.syscall; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.activemq.syscall.jni.AIO; +import org.junit.Test; + +import static org.apache.activemq.syscall.NativeAllocation.*; +import static org.apache.activemq.syscall.TestSupport.*; +import static org.apache.activemq.syscall.jni.IO.*; +import static org.hamcrest.CoreMatchers.*; +import static org.junit.Assert.*; +import static org.junit.Assume.*; + +/** + * + * @author Hiram Chirino + */ +public class FileDescriptorTest { + + @Test + public void writeWithACallback() throws IOException, InterruptedException, ExecutionException, TimeoutException { + assumeThat(AIO.SUPPORTED, is(true)); + + String expected = generateString(1024*4); + NativeAllocation buffer = allocate(expected); + + File file = dataFile(FileDescriptorTest.class.getName()+".writeWithACallback.data"); + + int oflags = O_NONBLOCK | O_CREAT | O_TRUNC | O_RDWR; + int mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH; + FileDescriptor fd = FileDescriptor.open(file, oflags, mode); + + try { + FutureCallback future = new FutureCallback(); + fd.write(0, buffer, future); + long count = future.get(1, TimeUnit.SECONDS); + + assertEquals(count, buffer.length()); + } finally { + fd.dispose(); + } + + assertEquals(expected, readFile(file)); + } + +} Added: activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/TestSupport.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/TestSupport.java?rev=882212&view=auto ============================================================================== --- activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/TestSupport.java (added) +++ activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/TestSupport.java Thu Nov 19 17:26:41 2009 @@ -0,0 +1,54 @@ +package org.apache.activemq.syscall; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; + +public class TestSupport { + public static String generateString(int size) { + StringBuffer sb = new StringBuffer(); + for( int i=0; i < size; i++ ) { + sb.append((char)('a'+(i%26))); + } + String expected = sb.toString(); + return expected; + } + + public static File dataFile(String name) { + name.replace('/', File.separatorChar); + String basedir = System.getProperty("basedir", "."); + File f = new File(basedir); + f = new File(f, "target"); + f = new File(f, "test-data"); + f.mkdirs(); + return new File(f, name); + } + + static public String readFile(File file) throws FileNotFoundException, IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + FileInputStream is = new FileInputStream(file); + try { + int c=0; + while( (c=is.read())>=0 ) { + baos.write(c); + } + } finally { + is.close(); + } + String actual = new String(baos.toByteArray()); + return actual; + } + + static public void writeFile(File file, String content) throws FileNotFoundException, IOException { + FileOutputStream os = new FileOutputStream(file); + try { + os.write(content.getBytes()); + } finally { + os.close(); + } + } + +} Modified: activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/jni/AIOTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/jni/AIOTest.java?rev=882212&r1=882211&r2=882212&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/jni/AIOTest.java (original) +++ activemq/sandbox/activemq-apollo/activemq-syscall/src/test/java/org/apache/activemq/syscall/jni/AIOTest.java Thu Nov 19 17:26:41 2009 @@ -13,25 +13,21 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - */package org.apache.activemq.syscall.jni; + */ +package org.apache.activemq.syscall.jni; -import java.io.ByteArrayOutputStream; import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; import java.io.IOException; import org.apache.activemq.syscall.NativeAllocation; -import org.apache.activemq.syscall.jni.AIO; import org.apache.activemq.syscall.jni.AIO.aiocb; import org.junit.Test; +import static org.apache.activemq.syscall.NativeAllocation.*; +import static org.apache.activemq.syscall.TestSupport.*; import static org.apache.activemq.syscall.jni.AIO.*; import static org.apache.activemq.syscall.jni.CLibrary.*; import static org.apache.activemq.syscall.jni.IO.*; - -import static org.apache.activemq.syscall.NativeAllocation.*; import static org.hamcrest.CoreMatchers.*; import static org.junit.Assert.*; import static org.junit.Assume.*; @@ -47,16 +43,9 @@ public void write() throws IOException, InterruptedException { assumeThat(AIO.SUPPORTED, is(true)); - File file = new File("target/test-data/test.data"); - file.getParentFile().mkdirs(); + File file = dataFile(AIOTest.class.getName()+".write.data"); - // Setup a buffer holds the data that we will be writing.. - StringBuffer sb = new StringBuffer(); - for( int i=0; i < 1024*4; i++ ) { - sb.append((char)('a'+(i%26))); - } - - String expected = sb.toString(); + String expected = generateString(1024*4); NativeAllocation writeBuffer = allocate(expected); long aiocbp = malloc(aiocb.SIZEOF); @@ -64,8 +53,9 @@ try { // open the file... + int oflags = O_NONBLOCK | O_CREAT | O_TRUNC| O_RDWR; int mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH; - int fd = open(file.getCanonicalPath(), O_NONBLOCK | O_CREAT | O_TRUNC| O_RDWR, mode); + int fd = open(file.getCanonicalPath(), oflags, mode); checkrc(fd); // Create a control block.. @@ -109,35 +99,10 @@ } } - // Read the file in and verify the contents is what we expect - String actual = loadContent(file); - assertEquals(expected, actual); + assertEquals(expected, readFile(file)); } - private String loadContent(File file) throws FileNotFoundException, IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - FileInputStream is = new FileInputStream(file); - try { - int c=0; - while( (c=is.read())>=0 ) { - baos.write(c); - } - } finally { - is.close(); - } - String actual = new String(baos.toByteArray()); - return actual; - } - private void storeContent(File file, String content) throws FileNotFoundException, IOException { - FileOutputStream os = new FileOutputStream(file); - try { - os.write(content.getBytes()); - } finally { - os.close(); - } - } - private void checkrc(int rc) throws IOException { if( rc==-1 ) { throw new IOException("IO failure: "+string(strerror(errno())));