activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [8/9] activemq-artemis git commit: ARTEMIS-163 First pass on the native AIO refactoring
Date Thu, 30 Jul 2015 09:14:30 GMT
ARTEMIS-163 First pass on the native AIO refactoring

https://issues.apache.org/jira/browse/ARTEMIS-163

On this pass I'm just converting the native layer to a simpler one.
It wasn't very easy to change the alignment at the current framework,
so I did some refactoring simplifying the native layer

The volume of the nubmer of changes here is because:

- The API is changed, we now don't close the libaio queue between files
- The native layer won't use malloc as much as it used to, saving some CPU and memory defragmentation
- I organized the code around nio and libaio


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/6fe9e0eb
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/6fe9e0eb
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/6fe9e0eb

Branch: refs/heads/master
Commit: 6fe9e0ebd6504897ba2b3ce7185cc63a0dbc8feb
Parents: 661f695
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Mon Jul 27 16:15:03 2015 -0400
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Wed Jul 29 22:12:03 2015 -0400

----------------------------------------------------------------------
 .gitignore                                      |    2 +-
 CMakeLists.txt                                  |   18 +
 README.md                                       |    2 +-
 .../activemq/artemis/cli/commands/Create.java   |   90 +-
 .../cli/commands/tools/CompactJournal.java      |    6 +-
 .../cli/commands/tools/DecodeJournal.java       |    4 +-
 .../cli/commands/tools/EncodeJournal.java       |    6 +-
 .../cli/commands/tools/XmlDataExporter.java     |    6 +-
 .../cli/commands/util/SyncCalculation.java      |  190 ++++
 .../artemis/cli/commands/etc/broker.xml         |    1 +
 .../commands/etc/journal-buffer-settings.txt    |    8 +
 .../apache/activemq/cli/test/ArtemisTest.java   |   23 +-
 .../activemq/cli/test/StreamClassPathTest.java  |    1 +
 .../artemis/api/core/ActiveMQNativeIOError.java |    5 +
 .../journal/JMSJournalStorageManagerImpl.java   |    6 +-
 .../artemis/core/asyncio/AIOCallback.java       |   33 -
 .../artemis/core/asyncio/AsynchronousFile.java  |   58 -
 .../artemis/core/asyncio/BufferCallback.java    |   27 -
 .../core/asyncio/IOExceptionListener.java       |   22 -
 .../core/asyncio/impl/ActiveMQFileLock.java     |   47 -
 .../core/asyncio/impl/AsynchronousFileImpl.java |  822 --------------
 .../artemis/core/io/AbstractSequentialFile.java |  407 +++++++
 .../core/io/AbstractSequentialFileFactory.java  |  224 ++++
 .../activemq/artemis/core/io/DummyCallback.java |   49 +
 .../activemq/artemis/core/io/IOCallback.java    |   33 +
 .../core/io/IOCriticalErrorListener.java        |   25 +
 .../artemis/core/io/IOExceptionListener.java    |   22 +
 .../artemis/core/io/SequentialFile.java         |  116 ++
 .../artemis/core/io/SequentialFileFactory.java  |   91 ++
 .../artemis/core/io/aio/AIOSequentialFile.java  |  333 ++++++
 .../core/io/aio/AIOSequentialFileFactory.java   |  531 +++++++++
 .../artemis/core/io/aio/ActiveMQFileLock.java   |   47 +
 .../artemis/core/io/buffer/TimedBuffer.java     |  558 ++++++++++
 .../core/io/buffer/TimedBufferObserver.java     |   53 +
 .../artemis/core/io/nio/NIOSequentialFile.java  |  393 +++++++
 .../core/io/nio/NIOSequentialFileFactory.java   |  168 +++
 .../artemis/core/journal/IOAsyncTask.java       |   27 -
 .../artemis/core/journal/IOCompletion.java      |    4 +-
 .../core/journal/IOCriticalErrorListener.java   |   22 -
 .../activemq/artemis/core/journal/Journal.java  |    1 +
 .../artemis/core/journal/SequentialFile.java    |  129 ---
 .../core/journal/SequentialFileFactory.java     |   89 --
 .../core/journal/impl/AIOSequentialFile.java    |  326 ------
 .../journal/impl/AIOSequentialFileFactory.java  |  358 ------
 .../journal/impl/AbstractJournalUpdateTask.java |    8 +-
 .../journal/impl/AbstractSequentialFile.java    |  407 -------
 .../impl/AbstractSequentialFileFactory.java     |  218 ----
 .../core/journal/impl/DummyCallback.java        |   48 -
 .../core/journal/impl/FileWrapperJournal.java   |    2 +-
 .../artemis/core/journal/impl/JournalBase.java  |    1 +
 .../core/journal/impl/JournalCompactor.java     |    6 +-
 .../artemis/core/journal/impl/JournalFile.java  |    2 +-
 .../core/journal/impl/JournalFileImpl.java      |    2 +-
 .../journal/impl/JournalFilesRepository.java    |    8 +-
 .../artemis/core/journal/impl/JournalImpl.java  |   28 +-
 .../core/journal/impl/NIOSequentialFile.java    |  404 -------
 .../journal/impl/NIOSequentialFileFactory.java  |  168 ---
 .../core/journal/impl/SyncSpeedTest.java        |  354 ------
 .../artemis/core/journal/impl/TimedBuffer.java  |  558 ----------
 .../core/journal/impl/TimedBufferObserver.java  |   52 -
 .../core/journal/impl/TransactionCallback.java  |   12 +-
 .../artemis/core/io/aio/CallbackOrderTest.java  |   97 ++
 .../artemis/maven/ActiveMQCreatePlugin.java     |    1 +
 artemis-native/bin/libartemis-native-32.so      |  Bin 44082 -> 22260 bytes
 artemis-native/bin/libartemis-native-64.so      |  Bin 46624 -> 23984 bytes
 artemis-native/pom.xml                          |   24 +
 artemis-native/src/main/c/AIOController.cpp     |   63 --
 artemis-native/src/main/c/AIOController.h       |   51 -
 artemis-native/src/main/c/AIOException.h        |   75 --
 artemis-native/src/main/c/AsyncFile.cpp         |  348 ------
 artemis-native/src/main/c/AsyncFile.h           |   93 --
 artemis-native/src/main/c/CMakeLists.txt        |   26 +-
 artemis-native/src/main/c/CallbackAdapter.h     |   42 -
 artemis-native/src/main/c/JAIODatatypes.h       |   28 -
 .../src/main/c/JNICallbackAdapter.cpp           |   62 --
 artemis-native/src/main/c/JNICallbackAdapter.h  |   66 --
 .../src/main/c/JNI_AsynchronousFileImpl.cpp     |  377 -------
 artemis-native/src/main/c/JavaUtilities.cpp     |   62 --
 artemis-native/src/main/c/JavaUtilities.h       |   26 -
 artemis-native/src/main/c/LockClass.h           |   39 -
 artemis-native/src/main/c/Version.h             |   24 -
 artemis-native/src/main/c/exception_helper.h    |   23 +
 ...che_activemq_artemis_jlibaio_LibaioContext.c |  710 ++++++++++++
 .../activemq/artemis/core/libaio/Native.java    |   74 --
 .../activemq/artemis/jlibaio/LibaioContext.java |  446 ++++++++
 .../activemq/artemis/jlibaio/LibaioFile.java    |  152 +++
 .../activemq/artemis/jlibaio/NativeLogger.java  |   51 +
 .../activemq/artemis/jlibaio/SubmitInfo.java    |   25 +
 .../activemq/artemis/jlibaio/package-info.java  |   24 +
 .../artemis/jlibaio/util/CallbackCache.java     |   93 ++
 .../jlibaio/test/CallbackCachelTest.java        |  112 ++
 .../artemis/jlibaio/test/LibaioTest.java        |  859 +++++++++++++++
 .../plug/ProtonSessionIntegrationCallback.java  |    4 +-
 .../core/protocol/mqtt/MQTTPublishManager.java  |    4 +-
 .../openwire/OpenWireProtocolManager.java       |    4 +-
 .../protocol/stomp/StompProtocolManager.java    |    4 +-
 .../deployers/impl/FileConfigurationParser.java |    2 +-
 .../artemis/core/paging/PagingManager.java      |    2 +-
 .../artemis/core/paging/PagingStore.java        |    2 +-
 .../artemis/core/paging/PagingStoreFactory.java |    2 +-
 .../cursor/impl/PageSubscriptionImpl.java       |    6 +-
 .../activemq/artemis/core/paging/impl/Page.java |    4 +-
 .../core/paging/impl/PagingStoreFactoryNIO.java |    8 +-
 .../core/paging/impl/PagingStoreImpl.java       |   10 +-
 .../core/persistence/OperationContext.java      |    4 +-
 .../core/persistence/StorageManager.java        |    6 +-
 .../impl/journal/DescribeJournal.java           |    8 +-
 .../impl/journal/JournalStorageManager.java     |   33 +-
 .../impl/journal/LargeServerMessageImpl.java    |    2 +-
 .../impl/journal/LargeServerMessageInSync.java  |    4 +-
 .../impl/journal/OperationContextImpl.java      |   12 +-
 .../nullpm/NullStorageLargeServerMessage.java   |    2 +-
 .../impl/nullpm/NullStorageManager.java         |    8 +-
 .../core/postoffice/impl/PostOfficeImpl.java    |    6 +-
 .../core/ServerSessionPacketHandler.java        |    4 +-
 .../core/replication/ReplicatedJournal.java     |    2 +-
 .../core/replication/ReplicationEndpoint.java   |    6 +-
 .../core/replication/ReplicationManager.java    |    2 +-
 .../core/server/ActiveMQServerLogger.java       |   10 +-
 .../artemis/core/server/LargeServerMessage.java |    2 +-
 .../core/server/cluster/impl/Redistributor.java |    4 +-
 .../server/impl/AIOFileLockNodeManager.java     |   29 +-
 .../core/server/impl/ActiveMQServerImpl.java    |   18 +-
 .../artemis/core/server/impl/QueueImpl.java     |    4 +-
 .../core/server/impl/ServerSessionImpl.java     |    4 +-
 .../core/transaction/impl/TransactionImpl.java  |    8 +-
 .../artemis/tests/util/ActiveMQTestBase.java    |   66 +-
 .../tests/util/ColocatedActiveMQServer.java     |    7 +-
 .../integration/client/HangConsumerTest.java    |    6 +-
 .../integration/client/JournalCrashTest.java    |    4 +-
 .../client/LibaioDependencyCheckTest.java       |    4 +-
 .../tests/integration/client/PagingTest.java    |    8 +-
 .../client/RedeliveryConsumerTest.java          |    4 +-
 .../integration/cluster/bridge/BridgeTest.java  |    6 +-
 .../journal/AIOImportExportTest.java            |    6 +-
 .../journal/AIOJournalCompactTest.java          |    6 +-
 .../integration/journal/AIOJournalImplTest.java |   10 +-
 .../journal/AIOSequentialFileFactoryTest.java   |   10 +-
 .../journal/NIOBufferedJournalCompactTest.java  |    6 +-
 .../journal/NIOImportExportTest.java            |    6 +-
 .../journal/NIOJournalCompactTest.java          |   16 +-
 .../integration/journal/NIOJournalImplTest.java |    6 +-
 .../journal/NIONoBufferJournalImplTest.java     |    6 +-
 ...NIONonBufferedSequentialFileFactoryTest.java |    6 +-
 .../journal/NIOSequentialFileFactoryTest.java   |    6 +-
 .../journal/ValidateTransactionHealthTest.java  |   15 +-
 .../management/ActiveMQServerControlTest.java   |   12 +-
 .../replication/ReplicationTest.java            |   16 +-
 .../integration/server/FileLockTimeoutTest.java |    4 +-
 .../journal/FakeJournalImplTest.java            |    2 +-
 .../journal/JournalImplTestUnit.java            |   13 +-
 .../journal/RealJournalImplAIOTest.java         |    6 +-
 .../journal/RealJournalImplNIOTest.java         |    6 +-
 .../AIOAllPossibilitiesCompactStressTest.java   |    6 +-
 .../AIOMultiThreadCompactorStressTest.java      |    2 +-
 .../stress/journal/AddAndRemoveStressTest.java  |   16 +-
 .../stress/journal/CompactingStressTest.java    |   12 +-
 .../JournalCleanupCompactStressTest.java        |   50 +-
 .../stress/journal/MixupCompactorTestBase.java  |    6 +-
 .../NIOMultiThreadCompactorStressTest.java      |    8 +-
 .../core/journal/impl/AIOJournalImplTest.java   |    6 +-
 .../core/journal/impl/FakeJournalImplTest.java  |    2 +-
 .../core/journal/impl/JournalImplTestUnit.java  |   15 +-
 .../core/journal/impl/NIOJournalImplTest.java   |    6 +-
 .../tests/unit/core/asyncio/AIOTestBase.java    |   18 +-
 .../unit/core/asyncio/AsynchronousFileTest.java | 1015 ------------------
 .../MultiThreadAsynchronousFileTest.java        |   81 +-
 .../journal/impl/AlignedJournalImplTest.java    |   14 +-
 .../unit/core/journal/impl/CleanBufferTest.java |   16 +-
 .../core/journal/impl/FakeJournalImplTest.java  |    2 +-
 .../impl/FakeSequentialFileFactoryTest.java     |    2 +-
 .../core/journal/impl/FileFactoryTestBase.java  |   18 +-
 .../core/journal/impl/JournalImplTestBase.java  |    6 +-
 .../core/journal/impl/JournalImplTestUnit.java  |    9 +-
 .../unit/core/journal/impl/ReclaimerTest.java   |    2 +-
 .../impl/SequentialFileFactoryTestBase.java     |   52 +-
 .../unit/core/journal/impl/TimedBufferTest.java |   26 +-
 .../impl/fakes/FakeSequentialFileFactory.java   |   84 +-
 .../tests/unit/core/paging/impl/PageTest.java   |   18 +-
 .../core/paging/impl/PagingStoreImplTest.java   |   14 +-
 .../impl/BatchIDGeneratorUnitTest.java          |    4 +-
 .../impl/OperationContextUnitTest.java          |    8 +-
 .../unit/core/server/impl/FileLockTest.java     |    4 +-
 183 files changed, 6522 insertions(+), 7227 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 71af624..ce57485 100644
--- a/.gitignore
+++ b/.gitignore
@@ -17,4 +17,4 @@ ratReport.txt
 **/cmake_install.cmake
 
 # this file is generated
-artemis-native/src/main/c/org_apache_activemq_artemis_core_libaio_Native.h
+artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.h

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
new file mode 100644
index 0000000..4681205
--- /dev/null
+++ b/CMakeLists.txt
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+CMAKE_MINIMUM_REQUIRED(VERSION 2.6)
+
+SUBDIRS(artemis-native)

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 65155f3..195441f 100644
--- a/README.md
+++ b/README.md
@@ -4,7 +4,7 @@ This file describes some minimum 'stuff one needs to know' to get started coding
 
 ## Source
 
-For details about the modifying the code, building the project, running tests, IDE integration, etc. see 
+For details about the modifying the code, building the project, running tests, IDE integration, etc. see
 our [Hacking Guide](./docs/hacking-guide/en/SUMMARY.md).
 
 ## Examples

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
index 7495ce2..2fdf5f2 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
@@ -26,6 +26,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.file.Files;
 import java.nio.file.attribute.PosixFilePermission;
+import java.text.DecimalFormat;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -36,8 +37,10 @@ import java.util.regex.Pattern;
 import io.airlift.airline.Arguments;
 import io.airlift.airline.Command;
 import io.airlift.airline.Option;
-import org.apache.activemq.artemis.core.asyncio.impl.AsynchronousFileImpl;
+import org.apache.activemq.artemis.cli.commands.util.SyncCalculation;
 import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.jlibaio.LibaioContext;
+import org.apache.activemq.artemis.jlibaio.LibaioFile;
 
 import static java.nio.file.attribute.PosixFilePermission.GROUP_EXECUTE;
 import static java.nio.file.attribute.PosixFilePermission.GROUP_READ;
@@ -84,6 +87,7 @@ public class Create extends InputAbstract
    public static final String ETC_CLUSTER_SETTINGS_TXT = "etc/cluster-settings.txt";
    public static final String ETC_CONNECTOR_SETTINGS_TXT = "etc/connector-settings.txt";
    public static final String ETC_BOOTSTRAP_WEB_SETTINGS_TXT = "etc/bootstrap-web-settings.txt";
+   public static final String ETC_JOURNAL_BUFFER_SETTINGS = "etc/journal-buffer-settings.txt";
 
    @Arguments(description = "The instance directory to hold the broker's configuration and data.  Path must be writable.", required = true)
    File directory;
@@ -142,6 +146,9 @@ public class Create extends InputAbstract
    @Option(name = "--require-login", description = "This will configure security to require user / password, opposite of --allow-anonymous")
    Boolean requireLogin = null;
 
+   @Option(name = "--no-sync-test", description = "Disable the calculation for the buffer")
+   boolean noSyncTest;
+
    @Option(name = "--user", description = "The username (Default: input)")
    String user;
 
@@ -529,12 +536,16 @@ public class Create extends InputAbstract
          filters.put("${shared-store.settings}", "");
       }
 
-      if (IS_WINDOWS || !AsynchronousFileImpl.isLoaded())
+      boolean aio;
+
+      if (IS_WINDOWS || !supportsLibaio())
       {
+         aio = false;
          filters.put("${journal.settings}", "NIO");
       }
       else
       {
+         aio = true;
          filters.put("${journal.settings}", "ASYNCIO");
       }
 
@@ -590,7 +601,8 @@ public class Create extends InputAbstract
       new File(directory, "etc").mkdirs();
       new File(directory, "log").mkdirs();
       new File(directory, "tmp").mkdirs();
-      new File(directory, "data").mkdirs();
+      File dataFolder = new File(directory, "data");
+      dataFolder.mkdirs();
 
       if (javaOptions == null || javaOptions.length() == 0)
       {
@@ -638,7 +650,7 @@ public class Create extends InputAbstract
          filters.put("${bootstrap-web-settings}", applyFilters(readTextFile(ETC_BOOTSTRAP_WEB_SETTINGS_TXT), filters));
       }
 
-
+      performSyncCalc(filters, aio, dataFolder);
 
       write(ETC_BOOTSTRAP_XML, filters, false);
       write(ETC_BROKER_XML, filters, false);
@@ -694,6 +706,76 @@ public class Create extends InputAbstract
       return null;
    }
 
+   private void performSyncCalc(HashMap<String, String> filters, boolean aio, File dataFolder)
+   {
+      if (noSyncTest)
+      {
+         filters.put("${journal-buffer.settings}", "");
+      }
+      else
+      {
+         try
+         {
+            int writes = 250;
+            System.out.println("");
+            System.out.println("Performing write sync calculation...");
+
+            long time = SyncCalculation.syncTest(dataFolder, 4096, writes, 5, verbose, aio);
+            long nanoseconds = SyncCalculation.toNanos(time, writes);
+            double writesPerMillisecond = (double)writes / (double) time;
+
+            String writesPerMillisecondStr = new DecimalFormat("###.##").format(writesPerMillisecond);
+
+            HashMap<String, String> syncFilter = new HashMap<String, String>();
+            syncFilter.put("${nanoseconds}", Long.toString(nanoseconds));
+            syncFilter.put("${writesPerMillisecond}", writesPerMillisecondStr);
+
+            System.out.println("done! Your system can make " + writesPerMillisecondStr +
+                    " writes per millisecond, your journal-buffer-timeout will be " + nanoseconds);
+
+            filters.put("${journal-buffer.settings}", applyFilters(readTextFile(ETC_JOURNAL_BUFFER_SETTINGS), syncFilter));
+
+         }
+         catch (Exception e)
+         {
+            filters.put("${journal-buffer.settings}", "");
+            e.printStackTrace();
+            System.err.println("Couldn't perform sync calculation, using default values");
+         }
+      }
+   }
+
+   private boolean supportsLibaio()
+   {
+      if (LibaioContext.isLoaded())
+      {
+         try (LibaioContext context = new LibaioContext(1, true))
+         {
+            File tmpFile = new File(directory, "validateAIO.bin");
+            boolean supportsLibaio = true;
+            try
+            {
+               LibaioFile file = context.openFile(tmpFile, true);
+               file.close();
+            }
+            catch (Exception e)
+            {
+               supportsLibaio = false;
+            }
+            tmpFile.delete();
+            if (!supportsLibaio)
+            {
+               System.err.println("The filesystem used on " + directory + " doesn't support libAIO and O_DIRECT files, switching journal-type to NIO");
+            }
+            return supportsLibaio;
+         }
+      }
+      else
+      {
+         return false;
+      }
+   }
+
    private void makeExec(String path) throws IOException
    {
       try

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/CompactJournal.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/CompactJournal.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/CompactJournal.java
index 0239998..38fb785 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/CompactJournal.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/CompactJournal.java
@@ -22,9 +22,9 @@ import io.airlift.airline.Command;
 import org.apache.activemq.artemis.cli.commands.Action;
 import org.apache.activemq.artemis.cli.commands.ActionContext;
 import org.apache.activemq.artemis.core.config.Configuration;
-import org.apache.activemq.artemis.core.journal.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
 import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
-import org.apache.activemq.artemis.core.journal.impl.NIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
 
 @Command(name = "compact", description = "Compacts the journal of a non running server")
 public final class CompactJournal extends DataAbstract implements Action
@@ -54,7 +54,7 @@ public final class CompactJournal extends DataAbstract implements Action
                                      final int fileSize,
                                      final IOCriticalErrorListener listener) throws Exception
    {
-      NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, listener);
+      NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, listener, 1);
 
       JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java
index 5b28a18..6b608c3 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java
@@ -35,7 +35,7 @@ import org.apache.activemq.artemis.cli.commands.Configurable;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
 import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
 import org.apache.activemq.artemis.core.journal.impl.JournalRecord;
-import org.apache.activemq.artemis.core.journal.impl.NIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
 import org.apache.activemq.artemis.utils.Base64;
 
 @Command(name = "decode", description = "Decode a journal's internal format into a new journal set of files")
@@ -117,7 +117,7 @@ public class DecodeJournal extends Configurable implements Action
             System.err.println("Could not create directory " + directory);
       }
 
-      NIOSequentialFileFactory nio = new NIOSequentialFileFactory(new File(directory), null);
+      NIOSequentialFileFactory nio = new NIOSequentialFileFactory(new File(directory), null, 1);
 
       JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/EncodeJournal.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/EncodeJournal.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/EncodeJournal.java
index 8b0721b..a408951 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/EncodeJournal.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/EncodeJournal.java
@@ -28,11 +28,11 @@ import org.apache.activemq.artemis.cli.commands.Action;
 import org.apache.activemq.artemis.cli.commands.ActionContext;
 import org.apache.activemq.artemis.cli.commands.Configurable;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
-import org.apache.activemq.artemis.core.journal.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.journal.impl.JournalFile;
 import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
 import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback;
-import org.apache.activemq.artemis.core.journal.impl.NIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
 import org.apache.activemq.artemis.utils.Base64;
 
 @Command(name = "encode", description = "Encode a set of journal files into an internal encoded data format")
@@ -113,7 +113,7 @@ public class EncodeJournal extends Configurable implements Action
                                     final int fileSize,
                                     final PrintStream out) throws Exception
    {
-      NIOSequentialFileFactory nio = new NIOSequentialFileFactory(new File(directory), null);
+      NIOSequentialFileFactory nio = new NIOSequentialFileFactory(new File(directory), null, 1);
 
       JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
index e099a0b..cf5be12 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
@@ -52,10 +52,10 @@ import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
 import org.apache.activemq.artemis.core.journal.Journal;
 import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
-import org.apache.activemq.artemis.core.journal.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
 import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
-import org.apache.activemq.artemis.core.journal.impl.NIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
 import org.apache.activemq.artemis.core.message.BodyEncoder;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.paging.PagingManager;
@@ -345,7 +345,7 @@ public final class XmlDataExporter extends DataAbstract implements Action
 
    private void getJmsBindings() throws Exception
    {
-      SequentialFileFactory bindingsJMS = new NIOSequentialFileFactory(config.getBindingsLocation());
+      SequentialFileFactory bindingsJMS = new NIOSequentialFileFactory(config.getBindingsLocation(), 1);
 
       Journal jmsJournal = new JournalImpl(1024 * 1024,
                                            2,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java
new file mode 100644
index 0000000..b5a8845
--- /dev/null
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.cli.commands.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.text.DecimalFormat;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
+import org.apache.activemq.artemis.jlibaio.LibaioContext;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+
+/**
+ * It will perform a simple test to evaluate how many syncs a disk can make per second
+ * * *
+ */
+public class SyncCalculation
+{
+   /**
+    * It will perform a write test of blockSize * bocks, sinc on each write, for N tries.
+    * It will return the lowest spent time from the tries.
+    */
+   public static long syncTest(File datafolder, int blockSize, int blocks, int tries, boolean verbose, boolean aio) throws Exception
+   {
+      SequentialFileFactory factory = newFactory(datafolder, aio);
+      SequentialFile file = factory.createSequentialFile("test.tmp");
+
+      try
+      {
+         file.delete();
+         file.open();
+
+         file.fill(blockSize * blocks);
+
+         long[] result = new long[tries];
+
+         byte[] block = new byte[blockSize];
+
+         for (int i = 0; i < block.length; i++)
+         {
+            block[i] = (byte) 't';
+         }
+
+         ByteBuffer bufferBlock = factory.newBuffer(blockSize);
+         bufferBlock.put(block);
+         bufferBlock.position(0);
+
+         final ReusableLatch latch = new ReusableLatch(0);
+
+         IOCallback callback = new IOCallback()
+         {
+            @Override
+            public void done()
+            {
+               latch.countDown();
+            }
+
+            @Override
+            public void onError(int errorCode, String errorMessage)
+            {
+
+            }
+         };
+
+         DecimalFormat dcformat = new DecimalFormat("###.##");
+         for (int ntry = 0; ntry < tries; ntry++)
+         {
+
+            if (verbose)
+            {
+               System.out.println("**************************************************");
+               System.out.println(ntry + " of " + tries + " calculation");
+            }
+            file.position(0);
+            long start = System.currentTimeMillis();
+            for (int i = 0; i < blocks; i++)
+            {
+               bufferBlock.position(0);
+               latch.countUp();
+               file.writeDirect(bufferBlock, true, callback);
+               if (!latch.await(5, TimeUnit.SECONDS))
+               {
+                  throw new IOException("Callback wasn't called");
+               }
+            }
+            long end = System.currentTimeMillis();
+
+            result[ntry] = (end - start);
+
+            if (verbose)
+            {
+               double writesPerMillisecond = (double)blocks / (double) result[ntry];
+               System.out.println("Time = " + result[ntry]);
+               System.out.println("Writes / millisecond = " + dcformat.format(writesPerMillisecond));
+               System.out.println("bufferTimeout = " + toNanos(result[ntry], blocks));
+               System.out.println("**************************************************");
+            }
+         }
+
+         factory.releaseDirectBuffer(bufferBlock);
+
+         long totalTime = Long.MAX_VALUE;
+         for (int i = 0; i < tries; i++)
+         {
+            if (result[i] < totalTime)
+            {
+               totalTime = result[i];
+            }
+         }
+
+         return totalTime;
+      }
+      finally
+      {
+         try
+         {
+            file.close();
+         }
+         catch (Exception e)
+         {
+         }
+         try
+         {
+            file.delete();
+         }
+         catch (Exception e)
+         {
+         }
+         try
+         {
+            factory.stop();
+         }
+         catch (Exception e)
+         {
+         }
+      }
+   }
+
+
+   public static long toNanos(long time, long blocks)
+   {
+
+      double blocksPerMillisecond = (double) blocks / (double) (time);
+
+      long nanoSeconds = TimeUnit.NANOSECONDS.convert(1, TimeUnit.MILLISECONDS);
+
+      long timeWait = (long) (nanoSeconds / blocksPerMillisecond);
+
+      return timeWait;
+   }
+
+   private static SequentialFileFactory newFactory(File datafolder, boolean aio)
+   {
+      if (aio && LibaioContext.isLoaded())
+      {
+         SequentialFileFactory factory = new AIOSequentialFileFactory(datafolder, 1);
+         factory.start();
+         ((AIOSequentialFileFactory) factory).disableBufferReuse();
+
+         return factory;
+      }
+      else
+      {
+         SequentialFileFactory factory = new NIOSequentialFileFactory(datafolder, 1);
+         factory.start();
+         return factory;
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
index 52d665e..1cd3245 100644
--- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
+++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
@@ -42,6 +42,7 @@ under the License.
       <large-messages-directory>${data.dir}/large-messages</large-messages-directory>
 
       <journal-min-files>10</journal-min-files>
+${journal-buffer.settings}
 ${connector-config.settings}
       <acceptors>
          <!-- Default ActiveMQ Artemis Acceptor.  Multi-protocol adapter.  Currently supports Core, OpenWire, Stomp and AMQP. -->

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/journal-buffer-settings.txt
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/journal-buffer-settings.txt b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/journal-buffer-settings.txt
new file mode 100644
index 0000000..566c29e
--- /dev/null
+++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/journal-buffer-settings.txt
@@ -0,0 +1,8 @@
+
+      <!--
+       This value was determined through a calculation.
+       Your system could perform ${writesPerMillisecond} writes per millisecond
+       on the current journal configuration.
+       That translates as a sync write every ${nanoseconds} nanoseconds
+      -->
+      <journal-buffer-timeout>${nanoseconds}</journal-buffer-timeout>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java b/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java
index 99c8f23..3aed71e 100644
--- a/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java
+++ b/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java
@@ -25,6 +25,8 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.cli.Artemis;
 import org.apache.activemq.artemis.cli.commands.Run;
+import org.apache.activemq.artemis.cli.commands.util.SyncCalculation;
+import org.apache.activemq.artemis.jlibaio.LibaioContext;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
 import org.junit.After;
@@ -69,15 +71,29 @@ public class ArtemisTest
    }
 
    @Test
+   public void testSync() throws Exception
+   {
+      int writes = 2560;
+      int tries = 10;
+      long totalAvg = SyncCalculation.syncTest(temporaryFolder.getRoot(), 4096, writes, tries, true, true);
+      System.out.println();
+      System.out.println("TotalAvg = " + totalAvg);
+      long nanoTime = SyncCalculation.toNanos(totalAvg, writes);
+      System.out.println("nanoTime avg = " + nanoTime);
+      Assert.assertEquals(0, LibaioContext.getTotalMaxIO());
+
+   }
+
+   @Test
    public void testSimpleRun() throws Exception
    {
       Run.setEmbedded(true);
       Artemis.main("create", temporaryFolder.getRoot().getAbsolutePath(), "--force", "--silent-input", "--no-web");
       System.setProperty("artemis.instance", temporaryFolder.getRoot().getAbsolutePath());
       // Some exceptions may happen on the initialization, but they should be ok on start the basic core protocol
-      Artemis.main("run");
-      Assert.assertEquals(Integer.valueOf(70), Artemis.execute("producer", "--txt-size", "50", "--message-count", "70", "--verbose"));
-      Assert.assertEquals(Integer.valueOf(70), Artemis.execute("consumer", "--txt-size", "50", "--verbose", "--break-on-null", "--receive-timeout", "100"));
+      Artemis.execute("run");
+      Assert.assertEquals(Integer.valueOf(1000), Artemis.execute("producer", "--message-count", "1000", "--verbose"));
+      Assert.assertEquals(Integer.valueOf(1000), Artemis.execute("consumer", "--verbose", "--break-on-null", "--receive-timeout", "100"));
 
       ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
       Connection connection = cf.createConnection();
@@ -116,6 +132,7 @@ public class ArtemisTest
 
       Artemis.execute("stop");
       Assert.assertTrue(Run.latchRunning.await(5, TimeUnit.SECONDS));
+      Assert.assertEquals(0, LibaioContext.getTotalMaxIO());
 
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-cli/src/test/java/org/apache/activemq/cli/test/StreamClassPathTest.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/test/java/org/apache/activemq/cli/test/StreamClassPathTest.java b/artemis-cli/src/test/java/org/apache/activemq/cli/test/StreamClassPathTest.java
index 632e2c0..0ee3afb 100644
--- a/artemis-cli/src/test/java/org/apache/activemq/cli/test/StreamClassPathTest.java
+++ b/artemis-cli/src/test/java/org/apache/activemq/cli/test/StreamClassPathTest.java
@@ -49,6 +49,7 @@ public class StreamClassPathTest
       openStream(Create.ETC_CLUSTER_SETTINGS_TXT);
       openStream(Create.ETC_CONNECTOR_SETTINGS_TXT);
       openStream(Create.ETC_BOOTSTRAP_WEB_SETTINGS_TXT);
+      openStream(Create.ETC_JOURNAL_BUFFER_SETTINGS);
    }
 
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQNativeIOError.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQNativeIOError.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQNativeIOError.java
index 8a47dfa..e69a7fd 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQNativeIOError.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQNativeIOError.java
@@ -34,4 +34,9 @@ public final class ActiveMQNativeIOError extends ActiveMQException
    {
       super(ActiveMQExceptionType.NATIVE_ERROR_CANT_INITIALIZE_AIO, msg);
    }
+
+   public ActiveMQNativeIOError(String msg, Throwable e)
+   {
+      super(ActiveMQExceptionType.NATIVE_ERROR_CANT_INITIALIZE_AIO, msg, e);
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java
index b3ef038..2d884e7 100644
--- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java
+++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java
@@ -29,9 +29,9 @@ import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.journal.Journal;
 import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
-import org.apache.activemq.artemis.core.journal.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
-import org.apache.activemq.artemis.core.journal.impl.NIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
 import org.apache.activemq.artemis.core.replication.ReplicatedJournal;
 import org.apache.activemq.artemis.core.replication.ReplicationManager;
 import org.apache.activemq.artemis.core.server.JournalType;
@@ -87,7 +87,7 @@ public final class JMSJournalStorageManagerImpl implements JMSStorageManager
 
       createDir = config.isCreateBindingsDir();
 
-      SequentialFileFactory bindingsJMS = new NIOSequentialFileFactory(config.getBindingsLocation());
+      SequentialFileFactory bindingsJMS = new NIOSequentialFileFactory(config.getBindingsLocation(), 1);
 
       Journal localJMS = new JournalImpl(1024 * 1024,
                                          2,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/AIOCallback.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/AIOCallback.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/AIOCallback.java
deleted file mode 100644
index 80aa753..0000000
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/AIOCallback.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.asyncio;
-
-/**
- * The interface used for AIO Callbacks.
- */
-public interface AIOCallback
-{
-   /**
-    * Method for sync notifications. When this callback method is called, there is a guarantee the data is written on the disk.
-    * <br><b>Note:</b><i>Leave this method as soon as possible, or you would be blocking the whole notification thread</i> */
-   void done();
-
-   /**
-    * Method for error notifications.
-    * Observation: The whole file will be probably failing if this happens. Like, if you delete the file, you will start to get errors for these operations*/
-   void onError(int errorCode, String errorMessage);
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/AsynchronousFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/AsynchronousFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/AsynchronousFile.java
deleted file mode 100644
index 52b8e05..0000000
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/AsynchronousFile.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.asyncio;
-
-import java.nio.ByteBuffer;
-
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-
-public interface AsynchronousFile
-{
-   void close() throws InterruptedException, ActiveMQException;
-
-   /**
-    *
-    * Note: If you are using a native Linux implementation, maxIO can't be higher than what's defined on /proc/sys/fs/aio-max-nr, or you would get an error
-    * @param fileName
-    * @param maxIO The number of max concurrent asynchronous IO operations. It has to be balanced between the size of your writes and the capacity of your disk.
-    * @throws ActiveMQException
-    */
-   void open(String fileName, int maxIO) throws ActiveMQException;
-
-   /**
-    * Warning: This function will perform a synchronous IO, probably translating to a fstat call
-    * @throws ActiveMQException
-    * */
-   long size() throws ActiveMQException;
-
-   /** Any error will be reported on the callback interface */
-   void write(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioCallback);
-
-   /**
-    * Performs an internal direct write.
-    * @throws ActiveMQException
-    */
-   void writeInternal(long positionToWrite, long size, ByteBuffer bytes) throws ActiveMQException;
-
-   void read(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioCallback) throws ActiveMQException;
-
-   void fill(long position, int blocks, long size, byte fillChar) throws ActiveMQException;
-
-   void setBufferCallback(BufferCallback callback);
-
-   int getBlockSize();
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/BufferCallback.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/BufferCallback.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/BufferCallback.java
deleted file mode 100644
index e7b0ca5..0000000
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/BufferCallback.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.asyncio;
-
-import java.nio.ByteBuffer;
-
-/**
- * Used to receive a notification on completed buffers used by the AIO layer.
- */
-public interface BufferCallback
-{
-   void bufferDone(ByteBuffer buffer);
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/IOExceptionListener.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/IOExceptionListener.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/IOExceptionListener.java
deleted file mode 100644
index 0cfe945..0000000
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/IOExceptionListener.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.asyncio;
-
-public interface IOExceptionListener
-{
-   void onIOException(Exception exception, String message);
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/impl/ActiveMQFileLock.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/impl/ActiveMQFileLock.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/impl/ActiveMQFileLock.java
deleted file mode 100644
index 0af3152..0000000
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/impl/ActiveMQFileLock.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.asyncio.impl;
-
-import java.io.IOException;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileLock;
-
-import org.apache.activemq.artemis.core.libaio.Native;
-
-public class ActiveMQFileLock extends FileLock
-{
-
-   private final int handle;
-
-   protected ActiveMQFileLock(final int handle)
-   {
-      super((FileChannel)null, 0, 0, false);
-      this.handle = handle;
-   }
-
-   @Override
-   public boolean isValid()
-   {
-      return true;
-   }
-
-   @Override
-   public void release() throws IOException
-   {
-      Native.closeFile(handle);
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/impl/AsynchronousFileImpl.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/impl/AsynchronousFileImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/impl/AsynchronousFileImpl.java
deleted file mode 100644
index be4d885..0000000
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/asyncio/impl/AsynchronousFileImpl.java
+++ /dev/null
@@ -1,822 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.asyncio.impl;
-
-import java.nio.ByteBuffer;
-import java.nio.channels.FileLock;
-import java.util.PriorityQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
-import org.apache.activemq.artemis.core.asyncio.AIOCallback;
-import org.apache.activemq.artemis.core.asyncio.AsynchronousFile;
-import org.apache.activemq.artemis.core.asyncio.BufferCallback;
-import org.apache.activemq.artemis.core.asyncio.IOExceptionListener;
-import org.apache.activemq.artemis.core.libaio.Native;
-import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
-import org.apache.activemq.artemis.utils.ReusableLatch;
-
-/**
- * AsynchronousFile implementation
- *
- *         Warning: Case you refactor the name or the package of this class
- *         You need to make sure you also rename the C++ native calls
- */
-public class AsynchronousFileImpl implements AsynchronousFile
-{
-   // Static ----------------------------------------------------------------------------
-
-   private static final AtomicInteger totalMaxIO = new AtomicInteger(0);
-
-   private static boolean loaded = false;
-
-   /**
-    * This definition needs to match Version.h on the native sources.
-    * <br>
-    * Or else the native module won't be loaded because of version mismatches
-    */
-   private static final int EXPECTED_NATIVE_VERSION = 52;
-
-   /**
-    * Used to determine the next writing sequence
-    */
-   private final AtomicLong nextWritingSequence = new AtomicLong(0);
-
-   /**
-    * Used to determine the next writing sequence.
-    * This is accessed from a single thread (the Poller Thread)
-    */
-   private long nextReadSequence = 0;
-
-   /**
-    * AIO can't guarantee ordering over callbacks.
-    * <br>
-    * We use this {@link PriorityQueue} to hold values until they are in order
-    */
-   private final PriorityQueue<CallbackHolder> pendingCallbacks = new PriorityQueue<CallbackHolder>();
-
-   public static void addMax(final int io)
-   {
-      AsynchronousFileImpl.totalMaxIO.addAndGet(io);
-   }
-
-   /**
-    * For test purposes
-    */
-   public static int getTotalMaxIO()
-   {
-      return AsynchronousFileImpl.totalMaxIO.get();
-   }
-
-   public static void resetMaxAIO()
-   {
-      AsynchronousFileImpl.totalMaxIO.set(0);
-   }
-
-   public static int openFile(String fileName)
-   {
-      return Native.openFile(fileName);
-   }
-
-   public static void closeFile(int handle)
-   {
-      Native.closeFile(handle);
-   }
-
-   public static void destroyBuffer(ByteBuffer buffer)
-   {
-      Native.destroyBuffer(buffer);
-   }
-
-   private static boolean loadLibrary(final String name)
-   {
-      try
-      {
-         ActiveMQJournalLogger.LOGGER.trace(name + " being loaded");
-         System.loadLibrary(name);
-         if (Native.getNativeVersion() != AsynchronousFileImpl.EXPECTED_NATIVE_VERSION)
-         {
-            ActiveMQJournalLogger.LOGGER.incompatibleNativeLibrary();
-            return false;
-         }
-         else
-         {
-            return true;
-         }
-      }
-      catch (Throwable e)
-      {
-         ActiveMQJournalLogger.LOGGER.debug(name + " -> error loading the native library", e);
-         return false;
-      }
-
-   }
-
-   static
-   {
-      String[] libraries = new String[]{"artemis-native", "artemis-native-64", "artemis-native-32"};
-
-      for (String library : libraries)
-      {
-         if (AsynchronousFileImpl.loadLibrary(library))
-         {
-            AsynchronousFileImpl.loaded = true;
-            break;
-         }
-         else
-         {
-            ActiveMQJournalLogger.LOGGER.debug("Library " + library + " not found!");
-         }
-      }
-
-      if (!AsynchronousFileImpl.loaded)
-      {
-         ActiveMQJournalLogger.LOGGER.debug("Couldn't locate LibAIO Wrapper");
-      }
-   }
-
-   public static boolean isLoaded()
-   {
-      return AsynchronousFileImpl.loaded;
-   }
-
-   // Attributes ------------------------------------------------------------------------
-
-   private boolean opened = false;
-
-   private String fileName;
-
-   /**
-    * Used while inside the callbackDone and callbackError
-    */
-   private final Lock callbackLock = new ReentrantLock();
-
-   private final ReusableLatch pollerLatch = new ReusableLatch();
-
-   private volatile Runnable poller;
-
-   private int maxIO;
-
-   private final Lock writeLock = new ReentrantReadWriteLock().writeLock();
-
-   private final ReusableLatch pendingWrites = new ReusableLatch();
-
-   private Semaphore maxIOSemaphore;
-
-   private BufferCallback bufferCallback;
-
-   /**
-    * A callback for IO errors when they happen
-    */
-   private final IOExceptionListener ioExceptionListener;
-
-   /**
-    * Warning: Beware of the C++ pointer! It will bite you! :-)
-    */
-   private ByteBuffer handler;
-
-   // A context switch on AIO would make it to synchronize the disk before
-   // switching to the new thread, what would cause
-   // serious performance problems. Because of that we make all the writes on
-   // AIO using a single thread.
-   private final Executor writeExecutor;
-
-   private final Executor pollerExecutor;
-
-   // AsynchronousFile implementation ---------------------------------------------------
-
-   /**
-    * @param writeExecutor  It needs to be a single Thread executor. If null it will use the user thread to execute write operations
-    * @param pollerExecutor The thread pool that will initialize poller handlers
-    */
-   public AsynchronousFileImpl(final Executor writeExecutor, final Executor pollerExecutor, final IOExceptionListener ioExceptionListener)
-   {
-      this.writeExecutor = writeExecutor;
-      this.pollerExecutor = pollerExecutor;
-      this.ioExceptionListener = ioExceptionListener;
-   }
-
-   public AsynchronousFileImpl(final Executor writeExecutor, final Executor pollerExecutor)
-   {
-      this(writeExecutor, pollerExecutor, null);
-   }
-
-   public void open(final String fileName1, final int maxIOArgument) throws ActiveMQException
-   {
-      writeLock.lock();
-
-      try
-      {
-         if (opened)
-         {
-            throw new IllegalStateException("AsynchronousFile is already opened");
-         }
-
-         this.maxIO = maxIOArgument;
-         maxIOSemaphore = new Semaphore(this.maxIO);
-
-         this.fileName = fileName1;
-
-         try
-         {
-            handler = Native.init(AsynchronousFileImpl.class, fileName1, this.maxIO, ActiveMQJournalLogger.LOGGER);
-         }
-         catch (ActiveMQException e)
-         {
-            ActiveMQException ex = null;
-            if (e.getType() == ActiveMQExceptionType.NATIVE_ERROR_CANT_INITIALIZE_AIO)
-            {
-               ex = new ActiveMQException(e.getType(),
-                                         "Can't initialize AIO. Currently AIO in use = " + AsynchronousFileImpl.totalMaxIO.get() +
-                                            ", trying to allocate more " +
-                                            maxIOArgument,
-                                         e);
-            }
-            else
-            {
-               ex = e;
-            }
-            throw ex;
-         }
-         opened = true;
-         AsynchronousFileImpl.addMax(this.maxIO);
-         nextWritingSequence.set(0);
-         nextReadSequence = 0;
-      }
-      finally
-      {
-         writeLock.unlock();
-      }
-   }
-
-   public void close() throws InterruptedException, ActiveMQException
-   {
-      checkOpened();
-
-      writeLock.lock();
-
-      try
-      {
-
-         while (!pendingWrites.await(60000))
-         {
-            ActiveMQJournalLogger.LOGGER.couldNotGetLock(fileName);
-         }
-
-         while (!maxIOSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS))
-         {
-            ActiveMQJournalLogger.LOGGER.couldNotGetLock(fileName);
-         }
-
-         maxIOSemaphore = null;
-         if (poller != null)
-         {
-            stopPoller();
-         }
-
-         if (handler != null)
-         {
-            Native.closeInternal(handler);
-            AsynchronousFileImpl.addMax(-maxIO);
-         }
-         opened = false;
-         handler = null;
-      }
-      finally
-      {
-         writeLock.unlock();
-      }
-   }
-
-
-   public void writeInternal(long positionToWrite, long size, ByteBuffer bytes) throws ActiveMQException
-   {
-      try
-      {
-         Native.writeInternal(handler, positionToWrite, size, bytes);
-      }
-      catch (ActiveMQException e)
-      {
-         fireExceptionListener(e.getType().getCode(), e.getMessage());
-         throw e;
-      }
-      if (bufferCallback != null)
-      {
-         bufferCallback.bufferDone(bytes);
-      }
-   }
-
-
-   public void write(final long position,
-                     final long size,
-                     final ByteBuffer directByteBuffer,
-                     final AIOCallback aioCallback)
-   {
-      if (aioCallback == null)
-      {
-         throw new NullPointerException("Null Callback");
-      }
-
-      checkOpened();
-      if (poller == null)
-      {
-         startPoller();
-      }
-
-      pendingWrites.countUp();
-
-      if (writeExecutor != null)
-      {
-         maxIOSemaphore.acquireUninterruptibly();
-
-         writeExecutor.execute(new Runnable()
-         {
-            public void run()
-            {
-               long sequence = nextWritingSequence.getAndIncrement();
-
-               try
-               {
-                  Native.write(AsynchronousFileImpl.this, handler, sequence, position, size, directByteBuffer, aioCallback);
-               }
-               catch (ActiveMQException e)
-               {
-                  callbackError(aioCallback, sequence, directByteBuffer, e.getType().getCode(), e.getMessage());
-               }
-               catch (RuntimeException e)
-               {
-                  callbackError(aioCallback,
-                                sequence,
-                                directByteBuffer,
-                                ActiveMQExceptionType.INTERNAL_ERROR.getCode(),
-                                e.getMessage());
-               }
-            }
-         });
-      }
-      else
-      {
-         maxIOSemaphore.acquireUninterruptibly();
-
-         long sequence = nextWritingSequence.getAndIncrement();
-
-         try
-         {
-            Native.write(this, handler, sequence, position, size, directByteBuffer, aioCallback);
-         }
-         catch (ActiveMQException e)
-         {
-            callbackError(aioCallback, sequence, directByteBuffer, e.getType().getCode(), e.getMessage());
-         }
-         catch (RuntimeException e)
-         {
-            callbackError(aioCallback, sequence, directByteBuffer, ActiveMQExceptionType.INTERNAL_ERROR.getCode(), e.getMessage());
-         }
-      }
-
-   }
-
-   public void read(final long position,
-                    final long size,
-                    final ByteBuffer directByteBuffer,
-                    final AIOCallback aioPackage) throws ActiveMQException
-   {
-      checkOpened();
-      if (poller == null)
-      {
-         startPoller();
-      }
-      pendingWrites.countUp();
-      maxIOSemaphore.acquireUninterruptibly();
-      try
-      {
-         Native.read(this, handler, position, size, directByteBuffer, aioPackage);
-      }
-      catch (ActiveMQException e)
-      {
-         // Release only if an exception happened
-         maxIOSemaphore.release();
-         pendingWrites.countDown();
-         throw e;
-      }
-      catch (RuntimeException e)
-      {
-         // Release only if an exception happened
-         maxIOSemaphore.release();
-         pendingWrites.countDown();
-         throw e;
-      }
-   }
-
-   public long size() throws ActiveMQException
-   {
-      checkOpened();
-      return Native.size0(handler);
-   }
-
-   public void fill(final long position, final int blocks, final long size, final byte fillChar) throws ActiveMQException
-   {
-      checkOpened();
-      try
-      {
-         Native.fill(handler, position, blocks, size, fillChar);
-      }
-      catch (ActiveMQException e)
-      {
-         fireExceptionListener(e.getType().getCode(), e.getMessage());
-         throw e;
-      }
-   }
-
-   public int getBlockSize()
-   {
-      return 512;
-   }
-
-   /**
-    * This needs to be synchronized because of
-    * http://bugs.sun.com/view_bug.do?bug_id=6791815
-    * http://mail.openjdk.java.net/pipermail/hotspot-runtime-dev/2009-January/000386.html
-    */
-   public static synchronized ByteBuffer newBuffer(final int size)
-   {
-      if (size % 512 != 0)
-      {
-         throw new RuntimeException("Buffer size needs to be aligned to 512");
-      }
-
-      return Native.newNativeBuffer(size);
-   }
-
-   public void setBufferCallback(final BufferCallback callback)
-   {
-      bufferCallback = callback;
-   }
-
-   /**
-    * Return the JNI handler used on C++
-    */
-   public ByteBuffer getHandler()
-   {
-      return handler;
-   }
-
-   public static void clearBuffer(final ByteBuffer buffer)
-   {
-      Native.resetBuffer(buffer, buffer.limit());
-      buffer.position(0);
-   }
-
-   // Protected -------------------------------------------------------------------------
-
-   @Override
-   protected void finalize()
-   {
-      if (opened)
-      {
-         ActiveMQJournalLogger.LOGGER.fileFinalizedWhileOpen(fileName);
-      }
-   }
-
-   // Private ---------------------------------------------------------------------------
-
-   private void callbackDone(final AIOCallback callback, final long sequence, final ByteBuffer buffer)
-   {
-      maxIOSemaphore.release();
-
-      pendingWrites.countDown();
-
-      callbackLock.lock();
-
-      try
-      {
-
-         if (sequence == -1)
-         {
-            callback.done();
-         }
-         else
-         {
-            if (sequence == nextReadSequence)
-            {
-               nextReadSequence++;
-               callback.done();
-               flushCallbacks();
-            }
-            else
-            {
-               pendingCallbacks.add(new CallbackHolder(sequence, callback));
-            }
-         }
-
-         // The buffer is not sent on callback for read operations
-         if (bufferCallback != null && buffer != null)
-         {
-            bufferCallback.bufferDone(buffer);
-         }
-      }
-      finally
-      {
-         callbackLock.unlock();
-      }
-   }
-
-   private void flushCallbacks()
-   {
-      while (!pendingCallbacks.isEmpty() && pendingCallbacks.peek().sequence == nextReadSequence)
-      {
-         CallbackHolder holder = pendingCallbacks.poll();
-         if (holder.isError())
-         {
-            ErrorCallback error = (ErrorCallback) holder;
-            holder.callback.onError(error.errorCode, error.message);
-         }
-         else
-         {
-            holder.callback.done();
-         }
-         nextReadSequence++;
-      }
-   }
-
-   // Called by the JNI layer.. just ignore the
-   // warning
-   private void callbackError(final AIOCallback callback,
-                              final long sequence,
-                              final ByteBuffer buffer,
-                              final int errorCode,
-                              final String errorMessage)
-   {
-      ActiveMQJournalLogger.LOGGER.callbackError(errorMessage);
-
-      fireExceptionListener(errorCode, errorMessage);
-
-      maxIOSemaphore.release();
-
-      pendingWrites.countDown();
-
-      callbackLock.lock();
-
-      try
-      {
-         if (sequence == -1)
-         {
-            callback.onError(errorCode, errorMessage);
-         }
-         else
-         {
-            if (sequence == nextReadSequence)
-            {
-               nextReadSequence++;
-               callback.onError(errorCode, errorMessage);
-               flushCallbacks();
-            }
-            else
-            {
-               pendingCallbacks.add(new ErrorCallback(sequence, callback, errorCode, errorMessage));
-            }
-         }
-      }
-      finally
-      {
-         callbackLock.unlock();
-      }
-
-      // The buffer is not sent on callback for read operations
-      if (bufferCallback != null && buffer != null)
-      {
-         bufferCallback.bufferDone(buffer);
-      }
-   }
-
-   /**
-    * This is called by the native layer
-    *
-    * @param errorCode
-    * @param errorMessage
-    */
-   private void fireExceptionListener(final int errorCode, final String errorMessage)
-   {
-      ActiveMQJournalLogger.LOGGER.ioError(errorCode, errorMessage);
-      if (ioExceptionListener != null)
-      {
-         ioExceptionListener.onIOException(ActiveMQExceptionType.getType(errorCode).createException(errorMessage), errorMessage);
-      }
-   }
-
-   private void pollEvents()
-   {
-      if (!opened)
-      {
-         return;
-      }
-      Native.internalPollEvents(handler);
-   }
-
-   private void startPoller()
-   {
-      writeLock.lock();
-
-      try
-      {
-
-         if (poller == null)
-         {
-            pollerLatch.countUp();
-            poller = new PollerRunnable();
-            try
-            {
-               pollerExecutor.execute(poller);
-            }
-            catch (Exception ex)
-            {
-               ActiveMQJournalLogger.LOGGER.errorStartingPoller(ex);
-            }
-         }
-      }
-      finally
-      {
-         writeLock.unlock();
-      }
-   }
-
-   private void checkOpened()
-   {
-      if (!opened)
-      {
-         throw new RuntimeException("File is not opened");
-      }
-   }
-
-   /**
-    * @throws ActiveMQException
-    * @throws InterruptedException
-    */
-   private void stopPoller() throws ActiveMQException, InterruptedException
-   {
-      Native.stopPoller(handler);
-      // We need to make sure we won't call close until Poller is
-      // completely done, or we might get beautiful GPFs
-      pollerLatch.await();
-   }
-
-   public static FileLock lock(int handle)
-   {
-      if (Native.flock(handle))
-      {
-         return new ActiveMQFileLock(handle);
-      }
-      else
-      {
-         return null;
-      }
-   }
-
-   // Native ----------------------------------------------------------------------------
-
-
-   /**
-    * Explicitly adding a compare to clause that returns 0 for at least the same object.
-    * <br>
-    * If {@link Comparable#compareTo(Object)} does not return 0 -for at least the same object- some
-    * Collection classes methods will fail (example {@link PriorityQueue#remove(Object)}. If it
-    * returns 0, then {@link #equals(Object)} must return {@code true} for the exact same cases,
-    * otherwise we will get compatibility problems between Java5 and Java6.
-    */
-   private static class CallbackHolder implements Comparable<CallbackHolder>
-   {
-      final long sequence;
-
-      final AIOCallback callback;
-
-      public boolean isError()
-      {
-         return false;
-      }
-
-      public CallbackHolder(final long sequence, final AIOCallback callback)
-      {
-         this.sequence = sequence;
-         this.callback = callback;
-      }
-
-      public int compareTo(final CallbackHolder o)
-      {
-         // It shouldn't be equals in any case
-         if (this == o)
-            return 0;
-         if (sequence <= o.sequence)
-         {
-            return -1;
-         }
-         else
-         {
-            return 1;
-         }
-      }
-
-      /**
-       * See {@link CallbackHolder}.
-       */
-      @Override
-      public int hashCode()
-      {
-         return super.hashCode();
-      }
-
-      /**
-       * See {@link CallbackHolder}.
-       */
-      @Override
-      public boolean equals(Object obj)
-      {
-         return super.equals(obj);
-      }
-   }
-
-   private static final class ErrorCallback extends CallbackHolder
-   {
-      final int errorCode;
-
-      final String message;
-
-      @Override
-      public boolean isError()
-      {
-         return true;
-      }
-
-      public ErrorCallback(final long sequence, final AIOCallback callback, final int errorCode, final String message)
-      {
-         super(sequence, callback);
-
-         this.errorCode = errorCode;
-
-         this.message = message;
-      }
-
-      /**
-       * See {@link CallbackHolder}.
-       */
-      @Override
-      public int hashCode()
-      {
-         return super.hashCode();
-      }
-
-      /**
-       * See {@link CallbackHolder}.
-       */
-      @Override
-      public boolean equals(Object obj)
-      {
-         return super.equals(obj);
-      }
-   }
-
-   private class PollerRunnable implements Runnable
-   {
-      PollerRunnable()
-      {
-      }
-
-      public void run()
-      {
-         try
-         {
-            pollEvents();
-         }
-         finally
-         {
-            // This gives us extra protection in cases of interruption
-            // Case the poller thread is interrupted, this will allow us to
-            // restart the thread when required
-            poller = null;
-            pollerLatch.countDown();
-         }
-      }
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java
new file mode 100644
index 0000000..acc0732
--- /dev/null
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
+import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
+import org.apache.activemq.artemis.core.io.buffer.TimedBufferObserver;
+import org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
+import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
+
+public abstract class AbstractSequentialFile implements SequentialFile
+{
+
+   private File file;
+
+   protected final File directory;
+
+   protected final SequentialFileFactory factory;
+
+   protected long fileSize = 0;
+
+   protected final AtomicLong position = new AtomicLong(0);
+
+   protected TimedBuffer timedBuffer;
+
+   /**
+    * Instead of having AIOSequentialFile implementing the Observer, I have done it on an inner class.
+    * This is the class returned to the factory when the file is being activated.
+    */
+   protected final TimedBufferObserver timedBufferObserver = new LocalBufferObserver();
+
+   /**
+    * Used for asynchronous writes
+    */
+   protected final Executor writerExecutor;
+
+   /**
+    * @param file
+    * @param directory
+    */
+   public AbstractSequentialFile(final File directory,
+                                 final String file,
+                                 final SequentialFileFactory factory,
+                                 final Executor writerExecutor)
+   {
+      super();
+      this.file = new File(directory, file);
+      this.directory = directory;
+      this.factory = factory;
+      this.writerExecutor = writerExecutor;
+   }
+
+   // Public --------------------------------------------------------
+
+   public final boolean exists()
+   {
+      return file.exists();
+   }
+
+   public final String getFileName()
+   {
+      return file.getName();
+   }
+
+   public final void delete() throws IOException, InterruptedException, ActiveMQException
+   {
+      if (isOpen())
+      {
+         close();
+      }
+
+      if (file.exists() && !file.delete())
+      {
+         ActiveMQJournalLogger.LOGGER.errorDeletingFile(this);
+      }
+   }
+
+   public void copyTo(SequentialFile newFileName) throws Exception
+   {
+      try
+      {
+         ActiveMQJournalLogger.LOGGER.debug("Copying " + this + " as " + newFileName);
+         if (!newFileName.isOpen())
+         {
+            newFileName.open();
+         }
+
+         if (!isOpen())
+         {
+            this.open();
+         }
+
+
+         ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);
+
+         for (;;)
+         {
+            buffer.rewind();
+            int size = this.read(buffer);
+            newFileName.writeDirect(buffer, false);
+            if (size < 10 * 1024)
+            {
+               break;
+            }
+         }
+         newFileName.close();
+         this.close();
+      }
+      catch (IOException e)
+      {
+         factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
+         throw e;
+      }
+   }
+
+   /**
+    * @throws IOException only declare exception due to signature. Sub-class needs it.
+    */
+   @Override
+   public void position(final long pos) throws IOException
+   {
+      position.set(pos);
+   }
+
+   public long position()
+   {
+      return position.get();
+   }
+
+   public final void renameTo(final String newFileName) throws IOException, InterruptedException,
+      ActiveMQException
+   {
+      try
+      {
+         close();
+      }
+      catch (IOException e)
+      {
+         factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
+         throw e;
+      }
+
+      File newFile = new File(directory + "/" + newFileName);
+
+      if (!file.equals(newFile))
+      {
+         if (!file.renameTo(newFile))
+         {
+            throw ActiveMQJournalBundle.BUNDLE.ioRenameFileError(file.getName(), newFileName);
+         }
+         file = newFile;
+      }
+   }
+
+   /**
+    * @throws IOException      we declare throwing IOException because sub-classes need to do it
+    * @throws ActiveMQException
+    */
+   public synchronized void close() throws IOException, InterruptedException, ActiveMQException
+   {
+      final CountDownLatch donelatch = new CountDownLatch(1);
+
+      if (writerExecutor != null)
+      {
+         writerExecutor.execute(new Runnable()
+         {
+            public void run()
+            {
+               donelatch.countDown();
+            }
+         });
+
+         while (!donelatch.await(60, TimeUnit.SECONDS))
+         {
+            ActiveMQJournalLogger.LOGGER.couldNotCompleteTask(new Exception("trace"), file.getName());
+         }
+      }
+   }
+
+   public final boolean fits(final int size)
+   {
+      if (timedBuffer == null)
+      {
+         return position.get() + size <= fileSize;
+      }
+      else
+      {
+         return timedBuffer.checkSize(size);
+      }
+   }
+
+   public void setTimedBuffer(final TimedBuffer buffer)
+   {
+      if (timedBuffer != null)
+      {
+         timedBuffer.setObserver(null);
+      }
+
+      timedBuffer = buffer;
+
+      if (buffer != null)
+      {
+         buffer.setObserver(timedBufferObserver);
+      }
+
+   }
+
+   public void write(final ActiveMQBuffer bytes, final boolean sync, final IOCallback callback) throws IOException
+   {
+      if (timedBuffer != null)
+      {
+         bytes.setIndex(0, bytes.capacity());
+         timedBuffer.addBytes(bytes, sync, callback);
+      }
+      else
+      {
+         ByteBuffer buffer = factory.newBuffer(bytes.capacity());
+         buffer.put(bytes.toByteBuffer().array());
+         buffer.rewind();
+         writeDirect(buffer, sync, callback);
+      }
+   }
+
+   public void write(final ActiveMQBuffer bytes, final boolean sync) throws IOException, InterruptedException,
+      ActiveMQException
+   {
+      if (sync)
+      {
+         SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
+
+         write(bytes, true, completion);
+
+         completion.waitCompletion();
+      }
+      else
+      {
+         write(bytes, false, DummyCallback.getInstance());
+      }
+   }
+
+   public void write(final EncodingSupport bytes, final boolean sync, final IOCallback callback)
+   {
+      if (timedBuffer != null)
+      {
+         timedBuffer.addBytes(bytes, sync, callback);
+      }
+      else
+      {
+         ByteBuffer buffer = factory.newBuffer(bytes.getEncodeSize());
+
+         // If not using the TimedBuffer, a final copy is necessary
+         // Because AIO will need a specific Buffer
+         // And NIO will also need a whole buffer to perform the write
+
+         ActiveMQBuffer outBuffer = ActiveMQBuffers.wrappedBuffer(buffer);
+         bytes.encode(outBuffer);
+         buffer.rewind();
+         writeDirect(buffer, sync, callback);
+      }
+   }
+
+   public void write(final EncodingSupport bytes, final boolean sync) throws InterruptedException, ActiveMQException
+   {
+      if (sync)
+      {
+         SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
+
+         write(bytes, true, completion);
+
+         completion.waitCompletion();
+      }
+      else
+      {
+         write(bytes, false, DummyCallback.getInstance());
+      }
+   }
+
+   protected File getFile()
+   {
+      return file;
+   }
+
+   private static final class DelegateCallback implements IOCallback
+   {
+      final List<IOCallback> delegates;
+
+      private DelegateCallback(final List<IOCallback> delegates)
+      {
+         this.delegates = delegates;
+      }
+
+      public void done()
+      {
+         for (IOCallback callback : delegates)
+         {
+            try
+            {
+               callback.done();
+            }
+            catch (Throwable e)
+            {
+               ActiveMQJournalLogger.LOGGER.errorCompletingCallback(e);
+            }
+         }
+      }
+
+      public void onError(final int errorCode, final String errorMessage)
+      {
+         for (IOCallback callback : delegates)
+         {
+            try
+            {
+               callback.onError(errorCode, errorMessage);
+            }
+            catch (Throwable e)
+            {
+               ActiveMQJournalLogger.LOGGER.errorCallingErrorCallback(e);
+            }
+         }
+      }
+   }
+
+   protected ByteBuffer newBuffer(int size, int limit)
+   {
+      size = factory.calculateBlockSize(size);
+      limit = factory.calculateBlockSize(limit);
+
+      ByteBuffer buffer = factory.newBuffer(size);
+      buffer.limit(limit);
+      return buffer;
+   }
+
+   protected class LocalBufferObserver implements TimedBufferObserver
+   {
+      public void flushBuffer(final ByteBuffer buffer, final boolean requestedSync, final List<IOCallback> callbacks)
+      {
+         buffer.flip();
+
+         if (buffer.limit() == 0)
+         {
+            factory.releaseBuffer(buffer);
+         }
+         else
+         {
+            writeDirect(buffer, requestedSync, new DelegateCallback(callbacks));
+         }
+      }
+
+      public ByteBuffer newBuffer(final int size, final int limit)
+      {
+         return AbstractSequentialFile.this.newBuffer(size, limit);
+      }
+
+      public int getRemainingBytes()
+      {
+         if (fileSize - position.get() > Integer.MAX_VALUE)
+         {
+            return Integer.MAX_VALUE;
+         }
+         else
+         {
+            return (int)(fileSize - position.get());
+         }
+      }
+
+      @Override
+      public String toString()
+      {
+         return "TimedBufferObserver on file (" + getFile().getName() + ")";
+      }
+
+   }
+
+   @Override
+   public File getJavaFile()
+   {
+      return getFile().getAbsoluteFile();
+   }
+}


Mime
View raw message