activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [7/9] activemq-artemis git commit: ARTEMIS-163 First pass on the native AIO refactoring
Date Thu, 30 Jul 2015 09:14:29 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java
new file mode 100644
index 0000000..61cd0fd
--- /dev/null
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.io;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
+import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
+import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
+import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
+
+/**
+ * An abstract SequentialFileFactory containing basic functionality for both AIO and NIO SequentialFactories
+ */
+public abstract class AbstractSequentialFileFactory implements SequentialFileFactory
+{
+
+   // Timeout used to wait executors to shutdown
+   protected static final int EXECUTOR_TIMEOUT = 60;
+
+   protected final File journalDir;
+
+   protected final TimedBuffer timedBuffer;
+
+   protected final int bufferSize;
+
+   protected final long bufferTimeout;
+
+   protected final int maxIO;
+
+   private final IOCriticalErrorListener critialErrorListener;
+
+   /**
+    * Asynchronous writes need to be done at another executor.
+    * This needs to be done at NIO, or else we would have the callers thread blocking for the return.
+    * At AIO this is necessary as context switches on writes would fire flushes at the kernel.
+    *  */
+   protected ExecutorService writeExecutor;
+
+   protected AbstractSequentialFileFactory(final File journalDir,
+                                        final boolean buffered,
+                                        final int bufferSize,
+                                        final int bufferTimeout,
+                                        final int maxIO,
+                                        final boolean logRates,
+                                        final IOCriticalErrorListener criticalErrorListener)
+   {
+      this.journalDir = journalDir;
+
+      if (buffered && bufferTimeout > 0)
+      {
+         timedBuffer = new TimedBuffer(bufferSize, bufferTimeout, logRates);
+      }
+      else
+      {
+         timedBuffer = null;
+      }
+      this.bufferSize = bufferSize;
+      this.bufferTimeout = bufferTimeout;
+      this.critialErrorListener = criticalErrorListener;
+      this.maxIO = maxIO;
+   }
+
+   public void stop()
+   {
+      if (timedBuffer != null)
+      {
+         timedBuffer.stop();
+      }
+
+      if (isSupportsCallbacks() && writeExecutor != null)
+      {
+         writeExecutor.shutdown();
+
+         try
+         {
+            if (!writeExecutor.awaitTermination(AbstractSequentialFileFactory.EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
+            {
+               ActiveMQJournalLogger.LOGGER.timeoutOnWriterShutdown(new Exception("trace"));
+            }
+         }
+         catch (InterruptedException e)
+         {
+            throw new ActiveMQInterruptedException(e);
+         }
+      }
+   }
+
+   @Override
+   public File getDirectory()
+   {
+      return journalDir;
+   }
+
+   public void start()
+   {
+      if (timedBuffer != null)
+      {
+         timedBuffer.start();
+      }
+
+      if (isSupportsCallbacks())
+      {
+         writeExecutor = Executors.newSingleThreadExecutor(new ActiveMQThreadFactory("ActiveMQ-Asynchronous-Persistent-Writes" + System.identityHashCode(this),
+                                                                                    true,
+                                                                                    AbstractSequentialFileFactory.getThisClassLoader()));
+      }
+   }
+
+   public int getMaxIO()
+   {
+      return maxIO;
+   }
+
+   @Override
+   public void onIOError(Exception exception, String message, SequentialFile file)
+   {
+      if (critialErrorListener != null)
+      {
+         critialErrorListener.onIOException(exception, message, file);
+      }
+   }
+
+   @Override
+   public void activateBuffer(final SequentialFile file)
+   {
+      if (timedBuffer != null)
+      {
+         file.setTimedBuffer(timedBuffer);
+      }
+   }
+
+   public void flush()
+   {
+      if (timedBuffer != null)
+      {
+         timedBuffer.flush();
+      }
+   }
+
+   public void deactivateBuffer()
+   {
+      if (timedBuffer != null)
+      {
+         // When moving to a new file, we need to make sure any pending buffer will be transferred to the buffer
+         timedBuffer.flush();
+         timedBuffer.setObserver(null);
+      }
+   }
+
+   public void releaseBuffer(final ByteBuffer buffer)
+   {
+   }
+
+   /**
+    * Create the directory if it doesn't exist yet
+    */
+   public void createDirs() throws Exception
+   {
+      boolean ok = journalDir.mkdirs();
+      if (!ok)
+      {
+         throw new IOException("Failed to create directory " + journalDir);
+      }
+   }
+
+   public List<String> listFiles(final String extension) throws Exception
+   {
+      FilenameFilter fnf = new FilenameFilter()
+      {
+         public boolean accept(final File file, final String name)
+         {
+            return name.endsWith("." + extension);
+         }
+      };
+
+      String[] fileNames = journalDir.list(fnf);
+
+      if (fileNames == null)
+      {
+         return Collections.EMPTY_LIST;
+      }
+
+      return Arrays.asList(fileNames);
+   }
+
+   private static ClassLoader getThisClassLoader()
+   {
+      return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
+      {
+         public ClassLoader run()
+         {
+            return AbstractSequentialFileFactory.class.getClassLoader();
+         }
+      });
+
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/DummyCallback.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/DummyCallback.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/DummyCallback.java
new file mode 100644
index 0000000..ce21f2a
--- /dev/null
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/DummyCallback.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.io;
+
+import org.apache.activemq.artemis.core.journal.impl.SyncIOCompletion;
+import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
+
+public class DummyCallback extends SyncIOCompletion
+{
+   private static final DummyCallback instance = new DummyCallback();
+
+   public static DummyCallback getInstance()
+   {
+      return DummyCallback.instance;
+   }
+
+   public void done()
+   {
+   }
+
+   public void onError(final int errorCode, final String errorMessage)
+   {
+      ActiveMQJournalLogger.LOGGER.errorWritingData(new Exception(errorMessage), errorMessage, errorCode);
+   }
+
+   @Override
+   public void waitCompletion() throws Exception
+   {
+   }
+
+   @Override
+   public void storeLineUp()
+   {
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCallback.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCallback.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCallback.java
new file mode 100644
index 0000000..41470e4
--- /dev/null
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCallback.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.io;
+
+/**
+ * The interface used for AIO Callbacks.
+ */
+public interface IOCallback
+{
+   /**
+    * Method for sync notifications. When this callback method is called, there is a guarantee the data is written on the disk.
+    * <br><b>Note:</b><i>Leave this method as soon as possible, or you would be blocking the whole notification thread</i> */
+   void done();
+
+   /**
+    * Method for error notifications.
+    * Observation: The whole file will be probably failing if this happens. Like, if you delete the file, you will start to get errors for these operations*/
+   void onError(int errorCode, String errorMessage);
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCriticalErrorListener.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCriticalErrorListener.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCriticalErrorListener.java
new file mode 100644
index 0000000..f2da3e8
--- /dev/null
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCriticalErrorListener.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.io;
+
+/**
+ * TODO Merge this with IOExceptionListener
+ */
+public interface IOCriticalErrorListener
+{
+   void onIOException(Exception code, String message, SequentialFile file);
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOExceptionListener.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOExceptionListener.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOExceptionListener.java
new file mode 100644
index 0000000..5c855e5
--- /dev/null
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOExceptionListener.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.io;
+
+public interface IOExceptionListener
+{
+   void onIOException(Exception exception, String message);
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFile.java
new file mode 100644
index 0000000..cb9d070
--- /dev/null
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFile.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
+
+public interface SequentialFile
+{
+
+   boolean isOpen();
+
+   boolean exists();
+
+   void open() throws Exception;
+
+   /**
+    * The maximum number of simultaneous writes accepted
+    * @param maxIO
+    * @throws Exception
+    */
+   void open(int maxIO, boolean useExecutor) throws Exception;
+
+   boolean fits(int size);
+
+   int getAlignment() throws Exception;
+
+   int calculateBlockStart(int position) throws Exception;
+
+   String getFileName();
+
+   void fill(int size) throws Exception;
+
+   void delete() throws IOException, InterruptedException, ActiveMQException;
+
+   void write(ActiveMQBuffer bytes, boolean sync, IOCallback callback) throws Exception;
+
+   void write(ActiveMQBuffer bytes, boolean sync) throws Exception;
+
+   void write(EncodingSupport bytes, boolean sync, IOCallback callback) throws Exception;
+
+   void write(EncodingSupport bytes, boolean sync) throws Exception;
+
+   /**
+    * Write directly to the file without using any buffer
+    * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or
+    *           NIO). To be safe, use a buffer from the corresponding
+    *           {@link SequentialFileFactory#newBuffer(int)}.
+    */
+   void writeDirect(ByteBuffer bytes, boolean sync, IOCallback callback);
+
+   /**
+    * Write directly to the file without using intermediate any buffer
+    * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or
+    *           NIO). To be safe, use a buffer from the corresponding
+    *           {@link SequentialFileFactory#newBuffer(int)}.
+    */
+   void writeDirect(ByteBuffer bytes, boolean sync) throws Exception;
+
+   /**
+    * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or
+    *           NIO). To be safe, use a buffer from the corresponding
+    *           {@link SequentialFileFactory#newBuffer(int)}.
+    */
+   int read(ByteBuffer bytes, IOCallback callback) throws Exception;
+
+   /**
+    * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or
+    *           NIO). To be safe, use a buffer from the corresponding
+    *           {@link SequentialFileFactory#newBuffer(int)}.
+    */
+   int read(ByteBuffer bytes) throws Exception;
+
+   void position(long pos) throws IOException;
+
+   long position();
+
+   void close() throws Exception;
+
+   void sync() throws IOException;
+
+   long size() throws Exception;
+
+   void renameTo(String newFileName) throws Exception;
+
+   SequentialFile cloneFile();
+
+   void copyTo(SequentialFile newFileName) throws Exception;
+
+   void setTimedBuffer(TimedBuffer buffer);
+
+   /**
+    * Returns a native File of the file underlying this sequential file.
+    */
+   File getJavaFile();
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java
new file mode 100644
index 0000000..b9a72ca
--- /dev/null
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.io;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ *
+ * A SequentialFileFactory
+ */
+public interface SequentialFileFactory
+{
+   SequentialFile createSequentialFile(String fileName);
+
+   int getMaxIO();
+
+   /**
+    * Lists files that end with the given extension.
+    * <p>
+    * This method inserts a ".' before the extension.
+    * @param extension
+    * @return
+    * @throws Exception
+    */
+   List<String> listFiles(String extension) throws Exception;
+
+   boolean isSupportsCallbacks();
+
+   /** The SequentialFile will call this method when a disk IO Error happens during the live phase. */
+   void onIOError(Exception exception, String message, SequentialFile file);
+
+   /** used for cases where you need direct buffer outside of the journal context.
+    *  This is because the native layer has a method that can be reused in certain cases like paging */
+   ByteBuffer allocateDirectBuffer(int size);
+
+   /** used for cases where you need direct buffer outside of the journal context.
+    *  This is because the native layer has a method that can be reused in certain cases like paging */
+   void releaseDirectBuffer(ByteBuffer buffer);
+
+   /**
+    * Note: You need to release the buffer if is used for reading operations. You don't need to do
+    * it if using writing operations (AIO Buffer Lister will take of writing operations)
+    * @param size
+    * @return the allocated ByteBuffer
+    */
+   ByteBuffer newBuffer(int size);
+
+   void releaseBuffer(ByteBuffer buffer);
+
+   void activateBuffer(SequentialFile file);
+
+   void deactivateBuffer();
+
+   // To be used in tests only
+   ByteBuffer wrapBuffer(byte[] bytes);
+
+   int getAlignment();
+
+   int calculateBlockSize(int bytes);
+
+   File getDirectory();
+
+   void clearBuffer(ByteBuffer buffer);
+
+   void start();
+
+   void stop();
+
+   /**
+    * Creates the directory if it does not exist yet.
+    */
+   void createDirs() throws Exception;
+
+   void flush();
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java
new file mode 100644
index 0000000..7503681
--- /dev/null
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.io.aio;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.PriorityQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQNativeIOError;
+import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.io.AbstractSequentialFile;
+import org.apache.activemq.artemis.core.io.DummyCallback;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
+import org.apache.activemq.artemis.jlibaio.LibaioFile;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+
+public class AIOSequentialFile extends AbstractSequentialFile
+{
+   private boolean opened = false;
+
+   private LibaioFile aioFile;
+
+   private final AIOSequentialFileFactory aioFactory;
+
+   private final ReusableLatch pendingCallbacks = new ReusableLatch();
+
+   /**
+    * Used to determine the next writing sequence
+    */
+   private final AtomicLong nextWritingSequence = new AtomicLong(0);
+
+   /**
+    * AIO can't guarantee ordering over callbacks.
+    * <br>
+    * We use this {@link PriorityQueue} to hold values until they are in order
+    */
+   final PriorityQueue<AIOSequentialFileFactory.AIOSequentialCallback> pendingCallbackList = new PriorityQueue<>();
+
+   /**
+    * Used to determine the next writing sequence.
+    * This is accessed from a single thread (the Poller Thread)
+    */
+   private long nextReadSequence = 0;
+
+
+   public AIOSequentialFile(final AIOSequentialFileFactory factory,
+                            final int bufferSize,
+                            final long bufferTimeoutMilliseconds,
+                            final File directory,
+                            final String fileName,
+                            final Executor writerExecutor)
+   {
+      super(directory, fileName, factory, writerExecutor);
+      this.aioFactory = factory;
+   }
+
+   public boolean isOpen()
+   {
+      return opened;
+   }
+
+   public int getAlignment()
+   {
+      checkOpened();
+
+      return aioFile.getBlockSize();
+   }
+
+   public int calculateBlockStart(final int position)
+   {
+      int alignment = getAlignment();
+
+      int pos = (position / alignment + (position % alignment != 0 ? 1 : 0)) * alignment;
+
+      return pos;
+   }
+
+   public SequentialFile cloneFile()
+   {
+      return new AIOSequentialFile(aioFactory,
+                                   -1,
+                                   -1,
+                                   getFile().getParentFile(),
+                                   getFile().getName(),
+                                   writerExecutor);
+   }
+
+   @Override
+   public synchronized void close() throws IOException, InterruptedException, ActiveMQException
+   {
+      if (!opened)
+      {
+         return;
+      }
+
+      super.close();
+
+      if (!pendingCallbacks.await(10, TimeUnit.SECONDS))
+      {
+         factory.onIOError(new IOException("Timeout on close"), "Timeout on close", this);
+      }
+
+      opened = false;
+
+      timedBuffer = null;
+
+      aioFile.close();
+      aioFile = null;
+   }
+
+
+   public synchronized void fill(final int size) throws Exception
+   {
+      checkOpened();
+      aioFile.fill(size);
+
+      fileSize = aioFile.getSize();
+   }
+
+   public void open() throws Exception
+   {
+      open(aioFactory.getMaxIO(), true);
+   }
+
+   public synchronized void open(final int maxIO, final boolean useExecutor) throws ActiveMQException
+   {
+      opened = true;
+
+      try
+      {
+         aioFile = aioFactory.libaioContext.openFile(getFile(), true);
+      }
+      catch (IOException e)
+      {
+         factory.onIOError(e, e.getMessage(), this);
+         throw new ActiveMQNativeIOError(e.getMessage(), e);
+      }
+
+      position.set(0);
+
+      fileSize = aioFile.getSize();
+   }
+
+   public int read(final ByteBuffer bytes, final IOCallback callback) throws ActiveMQException
+   {
+      checkOpened();
+      int bytesToRead = bytes.limit();
+
+      long positionToRead = position.getAndAdd(bytesToRead);
+
+      bytes.rewind();
+
+      try
+      {
+         // We don't send the buffer to the callback on read,
+         // because we want the buffer available.
+         // Sending it through the callback would make it released
+         aioFile.read(positionToRead, bytesToRead, bytes, getCallback(callback, null));
+      }
+      catch (IOException e)
+      {
+         factory.onIOError(e, e.getMessage(), this);
+         throw new ActiveMQNativeIOError(e.getMessage(), e);
+      }
+
+      return bytesToRead;
+   }
+
+   public int read(final ByteBuffer bytes) throws Exception
+   {
+      SimpleWaitIOCallback waitCompletion = new SimpleWaitIOCallback();
+
+      int bytesRead = read(bytes, waitCompletion);
+
+      waitCompletion.waitCompletion();
+
+      return bytesRead;
+   }
+
+   public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception
+   {
+      if (sync)
+      {
+         SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
+
+         writeDirect(bytes, true, completion);
+
+         completion.waitCompletion();
+      }
+      else
+      {
+         writeDirect(bytes, false, DummyCallback.getInstance());
+      }
+   }
+
+   /**
+    *
+    * Note: Parameter sync is not used on AIO
+    *  */
+   public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOCallback callback)
+   {
+      checkOpened();
+
+      final int bytesToWrite = factory.calculateBlockSize(bytes.limit());
+
+      final long positionToWrite = position.getAndAdd(bytesToWrite);
+
+      AIOSequentialFileFactory.AIOSequentialCallback runnableCallback = getCallback(callback, bytes);
+      runnableCallback.initWrite(positionToWrite, bytesToWrite);
+      if (writerExecutor != null)
+      {
+         writerExecutor.execute(runnableCallback);
+      }
+      else
+      {
+         runnableCallback.run();
+      }
+   }
+
+
+
+   AIOSequentialFileFactory.AIOSequentialCallback getCallback(IOCallback originalCallback, ByteBuffer buffer)
+   {
+      AIOSequentialFileFactory.AIOSequentialCallback callback = aioFactory.getCallback();
+      callback.init(this.nextWritingSequence.getAndIncrement(), originalCallback, aioFile, this, buffer);
+      pendingCallbacks.countUp();
+      return callback;
+   }
+
+
+   void done(AIOSequentialFileFactory.AIOSequentialCallback callback)
+   {
+      if (callback.writeSequence == -1)
+      {
+         callback.sequentialDone();
+         pendingCallbacks.countDown();
+      }
+
+
+      if (callback.writeSequence == nextReadSequence)
+      {
+         nextReadSequence++;
+         callback.sequentialDone();
+         pendingCallbacks.countDown();
+         flushCallbacks();
+      }
+      else
+      {
+         pendingCallbackList.add(callback);
+      }
+
+   }
+
+   private void flushCallbacks()
+   {
+      while (!pendingCallbackList.isEmpty() && pendingCallbackList.peek().writeSequence == nextReadSequence)
+      {
+         AIOSequentialFileFactory.AIOSequentialCallback callback = pendingCallbackList.poll();
+         callback.sequentialDone();
+         nextReadSequence++;
+         pendingCallbacks.countDown();
+      }
+   }
+
+   public void sync()
+   {
+      throw new UnsupportedOperationException("This method is not supported on AIO");
+   }
+
+   public long size() throws Exception
+   {
+      if (aioFile == null)
+      {
+         return getFile().length();
+      }
+      else
+      {
+         return aioFile.getSize();
+      }
+   }
+
+   @Override
+   public String toString()
+   {
+      return "AIOSequentialFile:" + getFile().getAbsolutePath();
+   }
+
+   // Protected methods
+   // -----------------------------------------------------------------------------------------------------
+
+   @Override
+   protected ByteBuffer newBuffer(int size, int limit)
+   {
+      size = factory.calculateBlockSize(size);
+      limit = factory.calculateBlockSize(limit);
+
+      ByteBuffer buffer = factory.newBuffer(size);
+      buffer.limit(limit);
+      return buffer;
+   }
+
+   // Private methods
+   // -----------------------------------------------------------------------------------------------------
+
+   private void checkOpened()
+   {
+      if (aioFile == null || !opened)
+      {
+         throw new NullPointerException("File not opened, file=null");
+      }
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
new file mode 100644
index 0000000..39dad2f
--- /dev/null
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.io.aio;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
+import org.apache.activemq.artemis.core.io.AbstractSequentialFileFactory;
+import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.journal.impl.JournalConstants;
+import org.apache.activemq.artemis.jlibaio.LibaioContext;
+import org.apache.activemq.artemis.jlibaio.LibaioFile;
+import org.apache.activemq.artemis.jlibaio.SubmitInfo;
+import org.apache.activemq.artemis.jlibaio.util.CallbackCache;
+import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
+import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
+
+public final class AIOSequentialFileFactory extends AbstractSequentialFileFactory
+{
+   private static final boolean trace = ActiveMQJournalLogger.LOGGER.isTraceEnabled();
+
+   private final ReuseBuffersController buffersControl = new ReuseBuffersController();
+
+   private volatile boolean reuseBuffers = true;
+
+   private ExecutorService pollerExecutor;
+
+   volatile LibaioContext<AIOSequentialCallback> libaioContext;
+
+   private final CallbackCache<AIOSequentialCallback> callbackPool;
+
+   private final AtomicBoolean running = new AtomicBoolean(false);
+
+   // This method exists just to make debug easier.
+   // I could replace log.trace by log.info temporarily while I was debugging
+   // Journal
+   private static void trace(final String message)
+   {
+      ActiveMQJournalLogger.LOGGER.trace(message);
+   }
+
+   public AIOSequentialFileFactory(final File journalDir, int maxIO)
+   {
+      this(journalDir,
+           JournalConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO,
+           JournalConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO,
+           maxIO,
+           false,
+           null);
+   }
+
+   public AIOSequentialFileFactory(final File journalDir, final IOCriticalErrorListener listener, int maxIO)
+   {
+      this(journalDir,
+           JournalConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO,
+           JournalConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO,
+           maxIO,
+           false,
+           listener);
+   }
+
+   public AIOSequentialFileFactory(final File journalDir,
+                                   final int bufferSize,
+                                   final int bufferTimeout,
+                                   final int maxIO,
+                                   final boolean logRates)
+   {
+      this(journalDir, bufferSize, bufferTimeout, maxIO, logRates, null);
+   }
+
+   public AIOSequentialFileFactory(final File journalDir,
+                                   final int bufferSize,
+                                   final int bufferTimeout,
+                                   final int maxIO,
+                                   final boolean logRates,
+                                   final IOCriticalErrorListener listener)
+   {
+      super(journalDir, true, bufferSize, bufferTimeout, maxIO, logRates, listener);
+      callbackPool = new CallbackCache<>(maxIO);
+   }
+
+   public AIOSequentialCallback getCallback()
+   {
+      AIOSequentialCallback callback = callbackPool.get();
+      if (callback == null)
+      {
+         callback = new AIOSequentialCallback();
+      }
+
+      return callback;
+   }
+
+   public void enableBufferReuse()
+   {
+      this.reuseBuffers = true;
+   }
+
+   public void disableBufferReuse()
+   {
+      this.reuseBuffers = false;
+   }
+
+
+   public SequentialFile createSequentialFile(final String fileName)
+   {
+      return new AIOSequentialFile(this,
+                                   bufferSize,
+                                   bufferTimeout,
+                                   journalDir,
+                                   fileName,
+                                   writeExecutor);
+   }
+
+   public boolean isSupportsCallbacks()
+   {
+      return true;
+   }
+
+   public static boolean isSupported()
+   {
+      return LibaioContext.isLoaded();
+   }
+
+   public ByteBuffer allocateDirectBuffer(final int size)
+   {
+
+      int blocks = size / 512;
+      if (size % 512 != 0)
+      {
+         blocks++;
+      }
+
+      // The buffer on AIO has to be a multiple of 512
+      ByteBuffer buffer = LibaioContext.newAlignedBuffer(blocks * 512, 512);
+
+      buffer.limit(size);
+
+      return buffer;
+   }
+
+   public void releaseDirectBuffer(final ByteBuffer buffer)
+   {
+      LibaioContext.freeBuffer(buffer);
+   }
+
+   public ByteBuffer newBuffer(int size)
+   {
+      if (size % 512 != 0)
+      {
+         size = (size / 512 + 1) * 512;
+      }
+
+      return buffersControl.newBuffer(size);
+   }
+
+   public void clearBuffer(final ByteBuffer directByteBuffer)
+   {
+      directByteBuffer.position(0);
+      libaioContext.memsetBuffer(directByteBuffer);
+   }
+
+   public int getAlignment()
+   {
+      return 512;
+   }
+
+   // For tests only
+   public ByteBuffer wrapBuffer(final byte[] bytes)
+   {
+      ByteBuffer newbuffer = newBuffer(bytes.length);
+      newbuffer.put(bytes);
+      return newbuffer;
+   }
+
+   public int calculateBlockSize(final int position)
+   {
+      int alignment = getAlignment();
+
+      int pos = (position / alignment + (position % alignment != 0 ? 1 : 0)) * alignment;
+
+      return pos;
+   }
+
+   /* (non-Javadoc)
+    * @see org.apache.activemq.artemis.core.io.SequentialFileFactory#releaseBuffer(java.nio.ByteBuffer)
+    */
+   @Override
+   public synchronized void releaseBuffer(final ByteBuffer buffer)
+   {
+      LibaioContext.freeBuffer(buffer);
+   }
+
+   @Override
+   public void start()
+   {
+      if (running.compareAndSet(false, true))
+      {
+         super.start();
+
+         this.libaioContext = new LibaioContext(maxIO, true);
+
+         this.running.set(true);
+
+         pollerExecutor = Executors.newCachedThreadPool(new ActiveMQThreadFactory("ActiveMQ-AIO-poller-pool" + System.identityHashCode(this),
+                                                                                  true,
+                                                                                  AIOSequentialFileFactory.getThisClassLoader()));
+
+         pollerExecutor.execute(new PollerRunnable());
+      }
+
+   }
+
+   @Override
+   public void stop()
+   {
+      if (this.running.compareAndSet(true, false))
+      {
+         buffersControl.stop();
+
+         libaioContext.close();
+         libaioContext = null;
+
+         if (pollerExecutor != null)
+         {
+            pollerExecutor.shutdown();
+
+            try
+            {
+               if (!pollerExecutor.awaitTermination(AbstractSequentialFileFactory.EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
+               {
+                  ActiveMQJournalLogger.LOGGER.timeoutOnPollerShutdown(new Exception("trace"));
+               }
+            }
+            catch (InterruptedException e)
+            {
+               throw new ActiveMQInterruptedException(e);
+            }
+         }
+
+         super.stop();
+      }
+   }
+
+   @Override
+   protected void finalize()
+   {
+      stop();
+   }
+
+   /**
+    * The same callback is used for Runnable executor.
+    * This way we can save some memory over the pool.
+    */
+   public class AIOSequentialCallback implements SubmitInfo, Runnable, Comparable<AIOSequentialCallback>
+   {
+      IOCallback callback;
+      boolean error = false;
+      AIOSequentialFile sequentialFile;
+      ByteBuffer buffer;
+      LibaioFile libaioFile;
+      String errorMessage;
+      int errorCode = -1;
+      long writeSequence;
+
+      long position;
+      int bytes;
+
+      @Override
+      public String toString()
+      {
+         return "AIOSequentialCallback{" +
+            "error=" + error +
+            ", errorMessage='" + errorMessage + '\'' +
+            ", errorCode=" + errorCode +
+            ", writeSequence=" + writeSequence +
+            ", position=" + position +
+            '}';
+      }
+
+      public AIOSequentialCallback initWrite(long positionToWrite, int bytesToWrite)
+      {
+         this.position = positionToWrite;
+         this.bytes = bytesToWrite;
+         return this;
+      }
+
+      public void run()
+      {
+         try
+         {
+            libaioFile.write(position, bytes, buffer, this);
+         }
+         catch (IOException e)
+         {
+            callback.onError(-1, e.getMessage());
+         }
+      }
+
+      public int compareTo(AIOSequentialCallback other)
+      {
+         if (this == other || this.writeSequence == other.writeSequence)
+         {
+            return 0;
+         }
+         else if (other.writeSequence < this.writeSequence)
+         {
+            return 1;
+         }
+         else
+         {
+            return -1;
+         }
+      }
+
+      public AIOSequentialCallback init(long writeSequence, IOCallback IOCallback, LibaioFile libaioFile, AIOSequentialFile sequentialFile, ByteBuffer usedBuffer)
+      {
+         this.callback = IOCallback;
+         this.sequentialFile = sequentialFile;
+         this.error = false;
+         this.buffer = usedBuffer;
+         this.libaioFile = libaioFile;
+         this.writeSequence = writeSequence;
+         this.errorMessage = null;
+         return this;
+      }
+
+      @Override
+      public void onError(int errno, String message)
+      {
+         this.error = true;
+         this.errorCode = errno;
+         this.errorMessage = message;
+      }
+
+      /**
+       * this is called by libaio.
+       */
+      public void done()
+      {
+         this.sequentialFile.done(this);
+      }
+
+      /**
+       * This is callbed by the AIOSequentialFile, after determined the callbacks were returned in sequence
+       */
+      public void sequentialDone()
+      {
+
+         if (error)
+         {
+            callback.onError(errorCode, errorMessage);
+            errorMessage = null;
+         }
+         else
+         {
+            if (callback != null)
+            {
+               callback.done();
+            }
+
+            if (buffer != null && reuseBuffers)
+            {
+               buffersControl.bufferDone(buffer);
+            }
+
+            callbackPool.put(AIOSequentialCallback.this);
+         }
+      }
+   }
+
+   private class PollerRunnable implements Runnable
+   {
+      public void run()
+      {
+         libaioContext.poll();
+      }
+   }
+
+   /**
+    * Class that will control buffer-reuse
+    */
+   private class ReuseBuffersController
+   {
+      private volatile long bufferReuseLastTime = System.currentTimeMillis();
+
+      private final ConcurrentLinkedQueue<ByteBuffer> reuseBuffersQueue = new ConcurrentLinkedQueue<ByteBuffer>();
+
+      private boolean stopped = false;
+
+      public ByteBuffer newBuffer(final int size)
+      {
+         // if a new buffer wasn't requested in 10 seconds, we clear the queue
+         // This is being done this way as we don't need another Timeout Thread
+         // just to cleanup this
+         if (bufferSize > 0 && System.currentTimeMillis() - bufferReuseLastTime > 10000)
+         {
+            if (AIOSequentialFileFactory.trace)
+            {
+               AIOSequentialFileFactory.trace("Clearing reuse buffers queue with " + reuseBuffersQueue.size() +
+                                                 " elements");
+            }
+
+            bufferReuseLastTime = System.currentTimeMillis();
+
+            clearPoll();
+         }
+
+         // if a buffer is bigger than the configured-bufferSize, we just create a new
+         // buffer.
+         if (size > bufferSize)
+         {
+            return LibaioContext.newAlignedBuffer(size, 512);
+         }
+         else
+         {
+            // We need to allocate buffers following the rules of the storage
+            // being used (AIO/NIO)
+            int alignedSize = calculateBlockSize(size);
+
+            // Try getting a buffer from the queue...
+            ByteBuffer buffer = reuseBuffersQueue.poll();
+
+            if (buffer == null)
+            {
+               // if empty create a new one.
+               buffer = LibaioContext.newAlignedBuffer(size, 512);
+
+               buffer.limit(alignedSize);
+            }
+            else
+            {
+               clearBuffer(buffer);
+
+               // set the limit of the buffer to the bufferSize being required
+               buffer.limit(alignedSize);
+            }
+
+            buffer.rewind();
+
+            return buffer;
+         }
+      }
+
+      public synchronized void stop()
+      {
+         stopped = true;
+         clearPoll();
+      }
+
+      public synchronized void clearPoll()
+      {
+         ByteBuffer reusedBuffer;
+
+         while ((reusedBuffer = reuseBuffersQueue.poll()) != null)
+         {
+            releaseBuffer(reusedBuffer);
+         }
+      }
+
+      public void bufferDone(final ByteBuffer buffer)
+      {
+         synchronized (this)
+         {
+
+            if (stopped)
+            {
+               releaseBuffer(buffer);
+            }
+            else
+            {
+               bufferReuseLastTime = System.currentTimeMillis();
+
+               // If a buffer has any other than the configured bufferSize, the buffer
+               // will be just sent to GC
+               if (buffer.capacity() == bufferSize)
+               {
+                  reuseBuffersQueue.offer(buffer);
+               }
+               else
+               {
+                  releaseBuffer(buffer);
+               }
+            }
+         }
+      }
+   }
+
+   private static ClassLoader getThisClassLoader()
+   {
+      return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
+      {
+         public ClassLoader run()
+         {
+            return AIOSequentialFileFactory.class.getClassLoader();
+         }
+      });
+
+   }
+
+   @Override
+   public String toString()
+   {
+      return AIOSequentialFileFactory.class.getSimpleName() + "(buffersControl.stopped=" + buffersControl.stopped +
+         "):" + super.toString();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/ActiveMQFileLock.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/ActiveMQFileLock.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/ActiveMQFileLock.java
new file mode 100644
index 0000000..a184244
--- /dev/null
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/ActiveMQFileLock.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.io.aio;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+
+import org.apache.activemq.artemis.jlibaio.LibaioFile;
+
+public class ActiveMQFileLock extends FileLock
+{
+
+   private final LibaioFile file;
+
+   public ActiveMQFileLock(final LibaioFile handle)
+   {
+      super((FileChannel)null, 0, 0, false);
+      this.file = handle;
+   }
+
+   @Override
+   public boolean isValid()
+   {
+      return true;
+   }
+
+   @Override
+   public void release() throws IOException
+   {
+      file.close();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
new file mode 100644
index 0000000..a61569a
--- /dev/null
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
@@ -0,0 +1,558 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.io.buffer;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
+import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
+import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
+
+public class TimedBuffer
+{
+   // Constants -----------------------------------------------------
+
+   // The number of tries on sleep before switching to spin
+   public static final int MAX_CHECKS_ON_SLEEP = 20;
+
+   // Attributes ----------------------------------------------------
+
+   private TimedBufferObserver bufferObserver;
+
+   // If the TimedBuffer is idle - i.e. no records are being added, then it's pointless the timer flush thread
+   // in spinning and checking the time - and using up CPU in the process - this semaphore is used to
+   // prevent that
+   private final Semaphore spinLimiter = new Semaphore(1);
+
+   private CheckTimer timerRunnable = new CheckTimer();
+
+   private final int bufferSize;
+
+   private final ActiveMQBuffer buffer;
+
+   private int bufferLimit = 0;
+
+   private List<IOCallback> callbacks;
+
+   private volatile int timeout;
+
+   // used to measure sync requests. When a sync is requested, it shouldn't take more than timeout to happen
+   private volatile boolean pendingSync = false;
+
+   private Thread timerThread;
+
+   private volatile boolean started;
+
+   // We use this flag to prevent flush occurring between calling checkSize and addBytes
+   // CheckSize must always be followed by it's corresponding addBytes otherwise the buffer
+   // can get in an inconsistent state
+   private boolean delayFlush;
+
+   // for logging write rates
+
+   private final boolean logRates;
+
+   private final AtomicLong bytesFlushed = new AtomicLong(0);
+
+   private final AtomicLong flushesDone = new AtomicLong(0);
+
+   private Timer logRatesTimer;
+
+   private TimerTask logRatesTimerTask;
+
+   private boolean useSleep = true;
+
+   // no need to be volatile as every access is synchronized
+   private boolean spinning = false;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public TimedBuffer(final int size, final int timeout, final boolean logRates)
+   {
+      bufferSize = size;
+
+      this.logRates = logRates;
+
+      if (logRates)
+      {
+         logRatesTimer = new Timer(true);
+      }
+      // Setting the interval for nano-sleeps
+
+      buffer = ActiveMQBuffers.fixedBuffer(bufferSize);
+
+      buffer.clear();
+
+      bufferLimit = 0;
+
+      callbacks = new ArrayList<IOCallback>();
+
+      this.timeout = timeout;
+   }
+
+   // for Debug purposes
+   public synchronized boolean isUseSleep()
+   {
+      return useSleep;
+   }
+
+   public synchronized void setUseSleep(boolean useSleep)
+   {
+      this.useSleep = useSleep;
+   }
+
+   public synchronized void start()
+   {
+      if (started)
+      {
+         return;
+      }
+
+      // Need to start with the spin limiter acquired
+      try
+      {
+         spinLimiter.acquire();
+      }
+      catch (InterruptedException e)
+      {
+         throw new ActiveMQInterruptedException(e);
+      }
+
+      timerRunnable = new CheckTimer();
+
+      timerThread = new Thread(timerRunnable, "activemq-buffer-timeout");
+
+      timerThread.start();
+
+      if (logRates)
+      {
+         logRatesTimerTask = new LogRatesTimerTask();
+
+         logRatesTimer.scheduleAtFixedRate(logRatesTimerTask, 2000, 2000);
+      }
+
+      started = true;
+   }
+
+   public void stop()
+   {
+      if (!started)
+      {
+         return;
+      }
+
+      flush();
+
+      bufferObserver = null;
+
+      timerRunnable.close();
+
+      spinLimiter.release();
+
+      if (logRates)
+      {
+         logRatesTimerTask.cancel();
+      }
+
+      while (timerThread.isAlive())
+      {
+         try
+         {
+            timerThread.join();
+         }
+         catch (InterruptedException e)
+         {
+            throw new ActiveMQInterruptedException(e);
+         }
+      }
+
+      started = false;
+   }
+
+   public synchronized void setObserver(final TimedBufferObserver observer)
+   {
+      if (bufferObserver != null)
+      {
+         flush();
+      }
+
+      bufferObserver = observer;
+   }
+
+   /**
+    * Verify if the size fits the buffer
+    *
+    * @param sizeChecked
+    */
+   public synchronized boolean checkSize(final int sizeChecked)
+   {
+      if (!started)
+      {
+         throw new IllegalStateException("TimedBuffer is not started");
+      }
+
+      if (sizeChecked > bufferSize)
+      {
+         throw new IllegalStateException("Can't write records bigger than the bufferSize(" + bufferSize +
+                                            ") on the journal");
+      }
+
+      if (bufferLimit == 0 || buffer.writerIndex() + sizeChecked > bufferLimit)
+      {
+         // Either there is not enough space left in the buffer for the sized record
+         // Or a flush has just been performed and we need to re-calcualate bufferLimit
+
+         flush();
+
+         delayFlush = true;
+
+         final int remainingInFile = bufferObserver.getRemainingBytes();
+
+         if (sizeChecked > remainingInFile)
+         {
+            return false;
+         }
+         else
+         {
+            // There is enough space in the file for this size
+
+            // Need to re-calculate buffer limit
+
+            bufferLimit = Math.min(remainingInFile, bufferSize);
+
+            return true;
+         }
+      }
+      else
+      {
+         delayFlush = true;
+
+         return true;
+      }
+   }
+
+   public synchronized void addBytes(final ActiveMQBuffer bytes, final boolean sync, final IOCallback callback)
+   {
+      addBytes(new ByteArrayEncoding(bytes.toByteBuffer().array()), sync, callback);
+   }
+
+   public synchronized void addBytes(final EncodingSupport bytes, final boolean sync, final IOCallback callback)
+   {
+      if (!started)
+      {
+         throw new IllegalStateException("TimedBuffer is not started");
+      }
+
+      delayFlush = false;
+
+      bytes.encode(buffer);
+
+      callbacks.add(callback);
+
+      if (sync)
+      {
+         pendingSync = true;
+
+         startSpin();
+      }
+
+   }
+
+   public void flush()
+   {
+      flush(false);
+   }
+
+   /**
+    * force means the Journal is moving to a new file. Any pending write need to be done immediately
+    * or data could be lost
+    */
+   public void flush(final boolean force)
+   {
+      synchronized (this)
+      {
+         if (!started)
+         {
+            throw new IllegalStateException("TimedBuffer is not started");
+         }
+
+         if ((force || !delayFlush) && buffer.writerIndex() > 0)
+         {
+            int pos = buffer.writerIndex();
+
+            if (logRates)
+            {
+               bytesFlushed.addAndGet(pos);
+            }
+
+            ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos);
+
+            // Putting a byteArray on a native buffer is much faster, since it will do in a single native call.
+            // Using bufferToFlush.put(buffer) would make several append calls for each byte
+            // We also transfer the content of this buffer to the native file's buffer
+
+            bufferToFlush.put(buffer.toByteBuffer().array(), 0, pos);
+
+            bufferObserver.flushBuffer(bufferToFlush, pendingSync, callbacks);
+
+            stopSpin();
+
+            pendingSync = false;
+
+            // swap the instance as the previous callback list is being used asynchronously
+            callbacks = new LinkedList<IOCallback>();
+
+            buffer.clear();
+
+            bufferLimit = 0;
+
+            flushesDone.incrementAndGet();
+         }
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+   private class LogRatesTimerTask extends TimerTask
+   {
+      private boolean closed;
+
+      private long lastExecution;
+
+      private long lastBytesFlushed;
+
+      private long lastFlushesDone;
+
+      @Override
+      public synchronized void run()
+      {
+         if (!closed)
+         {
+            long now = System.currentTimeMillis();
+
+            long bytesF = bytesFlushed.get();
+            long flushesD = flushesDone.get();
+
+            if (lastExecution != 0)
+            {
+               double rate = 1000 * (double) (bytesF - lastBytesFlushed) / (now - lastExecution);
+               ActiveMQJournalLogger.LOGGER.writeRate(rate, (long) (rate / (1024 * 1024)));
+               double flushRate = 1000 * (double) (flushesD - lastFlushesDone) / (now - lastExecution);
+               ActiveMQJournalLogger.LOGGER.flushRate(flushRate);
+            }
+
+            lastExecution = now;
+
+            lastBytesFlushed = bytesF;
+
+            lastFlushesDone = flushesD;
+         }
+      }
+
+      @Override
+      public synchronized boolean cancel()
+      {
+         closed = true;
+
+         return super.cancel();
+      }
+   }
+
+   private class CheckTimer implements Runnable
+   {
+      private volatile boolean closed = false;
+
+      int checks = 0;
+      int failedChecks = 0;
+      long timeBefore = 0;
+
+      final int sleepMillis = timeout / 1000000; // truncates
+      final int sleepNanos = timeout % 1000000;
+
+
+      public void run()
+      {
+         long lastFlushTime = 0;
+
+         while (!closed)
+         {
+            // We flush on the timer if there are pending syncs there and we've waited at least one
+            // timeout since the time of the last flush.
+            // Effectively flushing "resets" the timer
+            // On the timeout verification, notice that we ignore the timeout check if we are using sleep
+
+            if (pendingSync)
+            {
+               if (isUseSleep())
+               {
+                  // if using sleep, we will always flush
+                  flush();
+                  lastFlushTime = System.nanoTime();
+               }
+               else if (bufferObserver != null && System.nanoTime() > lastFlushTime + timeout)
+               {
+                  // if not using flush we will spin and do the time checks manually
+                  flush();
+                  lastFlushTime = System.nanoTime();
+               }
+
+            }
+
+            sleepIfPossible();
+
+            try
+            {
+               spinLimiter.acquire();
+
+               Thread.yield();
+
+               spinLimiter.release();
+            }
+            catch (InterruptedException e)
+            {
+               throw new ActiveMQInterruptedException(e);
+            }
+         }
+      }
+
+      /**
+       * We will attempt to use sleep only if the system supports nano-sleep
+       * we will on that case verify up to MAX_CHECKS if nano sleep is behaving well.
+       * if more than 50% of the checks have failed we will cancel the sleep and just use regular spin
+       */
+      private void sleepIfPossible()
+      {
+         if (isUseSleep())
+         {
+            if (checks < MAX_CHECKS_ON_SLEEP)
+            {
+               timeBefore = System.nanoTime();
+            }
+
+            try
+            {
+               sleep(sleepMillis, sleepNanos);
+            }
+            catch (InterruptedException e)
+            {
+               throw new ActiveMQInterruptedException(e);
+            }
+            catch (Exception e)
+            {
+               setUseSleep(false);
+               ActiveMQJournalLogger.LOGGER.warn(e.getMessage() + ", disabling sleep on TimedBuffer, using spin now", e);
+            }
+
+            if (checks < MAX_CHECKS_ON_SLEEP)
+            {
+               long realTimeSleep = System.nanoTime() - timeBefore;
+
+               // I'm letting the real time to be up to 50% than the requested sleep.
+               if (realTimeSleep > timeout * 1.5)
+               {
+                  failedChecks++;
+               }
+
+               if (++checks >= MAX_CHECKS_ON_SLEEP)
+               {
+                  if (failedChecks > MAX_CHECKS_ON_SLEEP * 0.5)
+                  {
+                     ActiveMQJournalLogger.LOGGER.debug("Thread.sleep with nano seconds is not working as expected, Your kernel possibly doesn't support real time. the Journal TimedBuffer will spin for timeouts");
+                     setUseSleep(false);
+                  }
+               }
+            }
+         }
+      }
+
+      public void close()
+      {
+         closed = true;
+      }
+   }
+
+   /**
+    * Sub classes (tests basically) can use this to override how the sleep is being done
+    *
+    * @param sleepMillis
+    * @param sleepNanos
+    * @throws InterruptedException
+    */
+   protected void sleep(int sleepMillis, int sleepNanos) throws InterruptedException
+   {
+      Thread.sleep(sleepMillis, sleepNanos);
+   }
+
+   /**
+    * Sub classes (tests basically) can use this to override disabling spinning
+    */
+   protected void stopSpin()
+   {
+      if (spinning)
+      {
+         try
+         {
+            // We acquire the spinLimiter semaphore - this prevents the timer flush thread unnecessarily spinning
+            // when the buffer is inactive
+            spinLimiter.acquire();
+         }
+         catch (InterruptedException e)
+         {
+            throw new ActiveMQInterruptedException(e);
+         }
+
+         spinning = false;
+      }
+   }
+
+
+   /**
+    * Sub classes (tests basically) can use this to override disabling spinning
+    */
+   protected void startSpin()
+   {
+      if (!spinning)
+      {
+         spinLimiter.release();
+
+         spinning = true;
+      }
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBufferObserver.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBufferObserver.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBufferObserver.java
new file mode 100644
index 0000000..7a9659f
--- /dev/null
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBufferObserver.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.io.buffer;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.activemq.artemis.core.io.IOCallback;
+
+
+public interface TimedBufferObserver
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   void flushBuffer(ByteBuffer buffer, boolean syncRequested, List<IOCallback> callbacks);
+
+   /** Return the number of remaining bytes that still fit on the observer (file) */
+   int getRemainingBytes();
+
+   ByteBuffer newBuffer(int size, int limit);
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
new file mode 100644
index 0000000..d20045e
--- /dev/null
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.io.nio;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
+import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
+import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.io.AbstractSequentialFile;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
+import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
+
+public final class NIOSequentialFile extends AbstractSequentialFile
+{
+   private FileChannel channel;
+
+   private RandomAccessFile rfile;
+
+   /**
+    * The write semaphore here is only used when writing asynchronously
+    */
+   private Semaphore maxIOSemaphore;
+
+   private final int defaultMaxIO;
+
+   private int maxIO;
+
+   public NIOSequentialFile(final SequentialFileFactory factory,
+                            final File directory,
+                            final String file,
+                            final int maxIO,
+                            final Executor writerExecutor)
+   {
+      super(directory, file, factory, writerExecutor);
+      defaultMaxIO = maxIO;
+   }
+
+   public int getAlignment()
+   {
+      return 1;
+   }
+
+   public int calculateBlockStart(final int position)
+   {
+      return position;
+   }
+
+   public synchronized boolean isOpen()
+   {
+      return channel != null;
+   }
+
+   /**
+    * this.maxIO represents the default maxIO.
+    * Some operations while initializing files on the journal may require a different maxIO
+    */
+   public synchronized void open() throws IOException
+   {
+      open(defaultMaxIO, true);
+   }
+
+   public void open(final int maxIO, final boolean useExecutor) throws IOException
+   {
+      try
+      {
+         rfile = new RandomAccessFile(getFile(), "rw");
+
+         channel = rfile.getChannel();
+
+         fileSize = channel.size();
+      }
+      catch (IOException e)
+      {
+         factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
+         throw e;
+      }
+
+      if (writerExecutor != null && useExecutor)
+      {
+         maxIOSemaphore = new Semaphore(maxIO);
+         this.maxIO = maxIO;
+      }
+   }
+
+   public void fill(final int size) throws IOException
+   {
+      ByteBuffer bb = ByteBuffer.allocate(size);
+
+      bb.limit(size);
+      bb.position(0);
+
+      try
+      {
+         channel.position(0);
+         channel.write(bb);
+         channel.force(false);
+         channel.position(0);
+      }
+      catch (IOException e)
+      {
+         factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
+         throw e;
+      }
+
+      fileSize = channel.size();
+   }
+
+   @Override
+   public synchronized void close() throws IOException, InterruptedException, ActiveMQException
+   {
+      super.close();
+
+      if (maxIOSemaphore != null)
+      {
+         while (!maxIOSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS))
+         {
+            ActiveMQJournalLogger.LOGGER.errorClosingFile(getFileName());
+         }
+      }
+
+      maxIOSemaphore = null;
+      try
+      {
+         if (channel != null)
+         {
+            channel.close();
+         }
+
+         if (rfile != null)
+         {
+            rfile.close();
+         }
+      }
+      catch (IOException e)
+      {
+         factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
+         throw e;
+      }
+      channel = null;
+
+      rfile = null;
+
+      notifyAll();
+   }
+
+   public int read(final ByteBuffer bytes) throws Exception
+   {
+      return read(bytes, null);
+   }
+
+   public synchronized int read(final ByteBuffer bytes, final IOCallback callback) throws IOException,
+      ActiveMQIllegalStateException
+   {
+      try
+      {
+         if (channel == null)
+         {
+            throw new ActiveMQIllegalStateException("File " + this.getFileName() + " has a null channel");
+         }
+         int bytesRead = channel.read(bytes);
+
+         if (callback != null)
+         {
+            callback.done();
+         }
+
+         bytes.flip();
+
+         return bytesRead;
+      }
+      catch (IOException e)
+      {
+         if (callback != null)
+         {
+            callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getLocalizedMessage());
+         }
+
+         factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
+
+         throw e;
+      }
+   }
+
+   public void sync() throws IOException
+   {
+      if (channel != null)
+      {
+         try
+         {
+            channel.force(false);
+         }
+         catch (IOException e)
+         {
+            factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
+            throw e;
+         }
+      }
+   }
+
+   public long size() throws IOException
+   {
+      if (channel == null)
+      {
+         return getFile().length();
+      }
+
+      try
+      {
+         return channel.size();
+      }
+      catch (IOException e)
+      {
+         factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
+         throw e;
+      }
+   }
+
+   @Override
+   public void position(final long pos) throws IOException
+   {
+      try
+      {
+         super.position(pos);
+         channel.position(pos);
+      }
+      catch (IOException e)
+      {
+         factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
+         throw e;
+      }
+   }
+
+   @Override
+   public String toString()
+   {
+      return "NIOSequentialFile " + getFile();
+   }
+
+   public SequentialFile cloneFile()
+   {
+      return new NIOSequentialFile(factory, directory, getFileName(), maxIO, writerExecutor);
+   }
+
+   public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOCallback callback)
+   {
+      if (callback == null)
+      {
+         throw new NullPointerException("callback parameter need to be set");
+      }
+
+      try
+      {
+         internalWrite(bytes, sync, callback);
+      }
+      catch (Exception e)
+      {
+         callback.onError(ActiveMQExceptionType.GENERIC_EXCEPTION.getCode(), e.getMessage());
+      }
+   }
+
+   public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception
+   {
+      internalWrite(bytes, sync, null);
+   }
+
+   public void writeInternal(final ByteBuffer bytes) throws Exception
+   {
+      internalWrite(bytes, true, null);
+   }
+
+   @Override
+   protected ByteBuffer newBuffer(int size, final int limit)
+   {
+      // For NIO, we don't need to allocate a buffer the entire size of the timed buffer, unlike AIO
+
+      size = limit;
+
+      return super.newBuffer(size, limit);
+   }
+
+   private void internalWrite(final ByteBuffer bytes, final boolean sync, final IOCallback callback) throws IOException, ActiveMQIOErrorException, InterruptedException
+   {
+      if (!isOpen())
+      {
+         if (callback != null)
+         {
+            callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "File not opened");
+         }
+         else
+         {
+            throw ActiveMQJournalBundle.BUNDLE.fileNotOpened();
+         }
+         return;
+      }
+
+      position.addAndGet(bytes.limit());
+
+      if (maxIOSemaphore == null || callback == null)
+      {
+         // if maxIOSemaphore == null, that means we are not using executors and the writes are synchronous
+         try
+         {
+            doInternalWrite(bytes, sync, callback);
+         }
+         catch (IOException e)
+         {
+            factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
+         }
+      }
+      else
+      {
+         // This is a flow control on writing, just like maxAIO on libaio
+         maxIOSemaphore.acquire();
+
+         writerExecutor.execute(new Runnable()
+         {
+            public void run()
+            {
+               try
+               {
+                  try
+                  {
+                     doInternalWrite(bytes, sync, callback);
+                  }
+                  catch (IOException e)
+                  {
+                     ActiveMQJournalLogger.LOGGER.errorSubmittingWrite(e);
+                     factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), NIOSequentialFile.this);
+                     callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
+                  }
+                  catch (Throwable e)
+                  {
+                     ActiveMQJournalLogger.LOGGER.errorSubmittingWrite(e);
+                     callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
+                  }
+               }
+               finally
+               {
+                  maxIOSemaphore.release();
+               }
+            }
+         });
+      }
+   }
+
+   /**
+    * @param bytes
+    * @param sync
+    * @param callback
+    * @throws IOException
+    * @throws Exception
+    */
+   private void doInternalWrite(final ByteBuffer bytes, final boolean sync, final IOCallback callback) throws IOException
+   {
+      channel.write(bytes);
+
+      if (sync)
+      {
+         sync();
+      }
+
+      if (callback != null)
+      {
+         callback.done();
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
new file mode 100644
index 0000000..e64e405
--- /dev/null
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.io.nio;
+
+import java.io.File;
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+
+import org.apache.activemq.artemis.core.io.AbstractSequentialFileFactory;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.journal.impl.JournalConstants;
+
+public class NIOSequentialFileFactory extends AbstractSequentialFileFactory
+{
+   public NIOSequentialFileFactory(final File journalDir, final int maxIO)
+   {
+      this(journalDir, null, maxIO);
+   }
+
+   public NIOSequentialFileFactory(final File journalDir, final IOCriticalErrorListener listener, final int maxIO)
+   {
+      this(journalDir,
+           false,
+           JournalConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO,
+           JournalConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO,
+           maxIO,
+           false,
+           listener);
+   }
+
+   public NIOSequentialFileFactory(final File journalDir, final boolean buffered, final int maxIO)
+   {
+      this(journalDir, buffered, null, maxIO);
+   }
+
+   public NIOSequentialFileFactory(final File journalDir,
+                                   final boolean buffered,
+                                   final IOCriticalErrorListener listener, final int maxIO)
+   {
+      this(journalDir,
+           buffered,
+           JournalConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO,
+           JournalConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO,
+           maxIO,
+           false,
+           listener);
+   }
+
+   public NIOSequentialFileFactory(final File journalDir,
+                                   final boolean buffered,
+                                   final int bufferSize,
+                                   final int bufferTimeout,
+                                   final int maxIO,
+                                   final boolean logRates)
+   {
+      this(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, null);
+   }
+
+   public NIOSequentialFileFactory(final File journalDir,
+                                   final boolean buffered,
+                                   final int bufferSize,
+                                   final int bufferTimeout,
+                                   final int maxIO,
+                                   final boolean logRates,
+                                   final IOCriticalErrorListener listener)
+   {
+      super(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, listener);
+   }
+
+   public SequentialFile createSequentialFile(final String fileName)
+   {
+      return new NIOSequentialFile(this, journalDir, fileName, maxIO, writeExecutor);
+   }
+
+   public boolean isSupportsCallbacks()
+   {
+      return timedBuffer != null;
+   }
+
+
+   public ByteBuffer allocateDirectBuffer(final int size)
+   {
+      // Using direct buffer, as described on https://jira.jboss.org/browse/HORNETQ-467
+      ByteBuffer buffer2 = null;
+      try
+      {
+         buffer2 = ByteBuffer.allocateDirect(size);
+      }
+      catch (OutOfMemoryError error)
+      {
+         // This is a workaround for the way the JDK will deal with native buffers.
+         // the main portion is outside of the VM heap
+         // and the JDK will not have any reference about it to take GC into account
+         // so we force a GC and try again.
+         WeakReference<Object> obj = new WeakReference<Object>(new Object());
+         try
+         {
+            long timeout = System.currentTimeMillis() + 5000;
+            while (System.currentTimeMillis() > timeout && obj.get() != null)
+            {
+               System.gc();
+               Thread.sleep(100);
+            }
+         }
+         catch (InterruptedException e)
+         {
+         }
+
+         buffer2 = ByteBuffer.allocateDirect(size);
+
+      }
+      return buffer2;
+   }
+
+   public void releaseDirectBuffer(ByteBuffer buffer)
+   {
+      // nothing we can do on this case. we can just have good faith on GC
+   }
+
+   public ByteBuffer newBuffer(final int size)
+   {
+      return ByteBuffer.allocate(size);
+   }
+
+   public void clearBuffer(final ByteBuffer buffer)
+   {
+      final int limit = buffer.limit();
+      buffer.rewind();
+
+      for (int i = 0; i < limit; i++)
+      {
+         buffer.put((byte)0);
+      }
+
+      buffer.rewind();
+   }
+
+   public ByteBuffer wrapBuffer(final byte[] bytes)
+   {
+      return ByteBuffer.wrap(bytes);
+   }
+
+   public int getAlignment()
+   {
+      return 1;
+   }
+
+   public int calculateBlockSize(final int bytes)
+   {
+      return bytes;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOAsyncTask.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOAsyncTask.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOAsyncTask.java
deleted file mode 100644
index 09c80ca..0000000
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOAsyncTask.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.journal;
-
-import org.apache.activemq.artemis.core.asyncio.AIOCallback;
-
-/**
- * This class is just a direct extension of AIOCallback.
- * Just to avoid the direct dependency of org.apache.activemq.artemis.core.asynciio.AIOCallback from the journal.
- */
-public interface IOAsyncTask extends AIOCallback
-{
-}


Mime
View raw message