activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [5/9] activemq-artemis git commit: ARTEMIS-163 First pass on the native AIO refactoring
Date Thu, 30 Jul 2015 09:14:27 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/SyncSpeedTest.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/SyncSpeedTest.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/SyncSpeedTest.java
deleted file mode 100644
index 0c982dc..0000000
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/SyncSpeedTest.java
+++ /dev/null
@@ -1,354 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.journal.impl;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-
-import org.apache.activemq.artemis.core.journal.IOAsyncTask;
-import org.apache.activemq.artemis.core.journal.SequentialFile;
-import org.apache.activemq.artemis.core.journal.SequentialFileFactory;
-import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
-
-/**
- * A SyncSpeedTest
- *
- * This class just provides some diagnostics on how fast your disk can sync
- * Useful when determining performance issues
- */
-public class SyncSpeedTest
-{
-   public static void main(final String[] args)
-   {
-      try
-      {
-         new SyncSpeedTest().testScaleAIO();
-      }
-      catch (Exception e)
-      {
-         e.printStackTrace();
-      }
-   }
-
-   protected SequentialFileFactory fileFactory;
-
-   public boolean AIO = true;
-
-   protected void setupFactory()
-   {
-      if (AIO)
-      {
-         fileFactory = new AIOSequentialFileFactory(new File("."), 0, 0, false, null);
-      }
-      else
-      {
-         fileFactory = new NIOSequentialFileFactory(new File("."), false, 0, 0, false, null);
-      }
-   }
-
-   protected SequentialFile createSequentialFile(final String fileName)
-   {
-      if (AIO)
-      {
-         return new AIOSequentialFile(fileFactory,
-                                      0,
-                                      0,
-                                      new File("."),
-                                      fileName,
-                                      100000,
-                                      null,
-                                      null,
-                                      Executors.newSingleThreadExecutor());
-      }
-      else
-      {
-         return new NIOSequentialFile(fileFactory, new File("."), fileName, 1000, null);
-      }
-   }
-
-   public void run2() throws Exception
-   {
-      setupFactory();
-
-      int recordSize = 128 * 1024;
-
-      while (true)
-      {
-         System.out.println("** record size is " + recordSize);
-
-         int warmup = 500;
-
-         int its = 500;
-
-         int fileSize = (its + warmup) * recordSize;
-
-         SequentialFile file = createSequentialFile("sync-speed-test.dat");
-
-         if (file.exists())
-         {
-            file.delete();
-         }
-
-         file.open();
-
-         file.fill(0, fileSize, (byte)'X');
-
-         if (!AIO)
-         {
-            file.sync();
-         }
-
-         ByteBuffer bb1 = generateBuffer(recordSize, (byte)'h');
-
-         long start = 0;
-
-         for (int i = 0; i < its + warmup; i++)
-         {
-            if (i == warmup)
-            {
-               start = System.currentTimeMillis();
-            }
-
-            bb1.rewind();
-
-            file.writeDirect(bb1, true);
-         }
-
-         long end = System.currentTimeMillis();
-
-         double rate = 1000 * (double)its / (end - start);
-
-         double throughput = recordSize * rate;
-
-         System.out.println("Rate of " + rate + " syncs per sec");
-         System.out.println("Throughput " + throughput + " bytes per sec");
-         System.out.println("*************");
-
-         recordSize *= 2;
-      }
-   }
-
-   public void run() throws Exception
-   {
-      int recordSize = 256;
-
-      while (true)
-      {
-         System.out.println("** record size is " + recordSize);
-
-         int warmup = 500;
-
-         int its = 500;
-
-         int fileSize = (its + warmup) * recordSize;
-
-         File file = new File("sync-speed-test.dat");
-
-         if (file.exists())
-         {
-            if (!file.delete())
-            {
-               ActiveMQJournalLogger.LOGGER.errorDeletingFile(file);
-            }
-         }
-
-         boolean created = file.createNewFile();
-         if (!created)
-            throw new IOException("could not create file " + file);
-
-         RandomAccessFile rfile = new RandomAccessFile(file, "rw");
-
-         FileChannel channel = rfile.getChannel();
-
-         ByteBuffer bb = generateBuffer(fileSize, (byte)'x');
-
-         write(bb, channel, fileSize);
-
-         channel.force(true);
-
-         channel.position(0);
-
-         ByteBuffer bb1 = generateBuffer(recordSize, (byte)'h');
-
-         long start = 0;
-
-         for (int i = 0; i < its + warmup; i++)
-         {
-            if (i == warmup)
-            {
-               start = System.currentTimeMillis();
-            }
-
-            bb1.flip();
-            channel.write(bb1);
-            channel.force(false);
-         }
-
-         long end = System.currentTimeMillis();
-
-         double rate = 1000 * (double)its / (end - start);
-
-         double throughput = recordSize * rate;
-
-         System.out.println("Rate of " + rate + " syncs per sec");
-         System.out.println("Throughput " + throughput + " bytes per sec");
-
-         recordSize *= 2;
-      }
-   }
-
-   public void testScaleAIO() throws Exception
-   {
-      setupFactory();
-
-      final int recordSize = 1024;
-
-      System.out.println("** record size is " + recordSize);
-
-      final int its = 10;
-
-      for (int numThreads = 1; numThreads <= 10; numThreads++)
-      {
-
-         int fileSize = its * recordSize * numThreads;
-
-         final SequentialFile file = createSequentialFile("sync-speed-test.dat");
-
-         if (file.exists())
-         {
-            file.delete();
-         }
-
-         file.open();
-
-         file.fill(0, fileSize, (byte)'X');
-
-         if (!AIO)
-         {
-            file.sync();
-         }
-
-         final CountDownLatch latch = new CountDownLatch(its * numThreads);
-
-         class MyIOAsyncTask implements IOAsyncTask
-         {
-            public void done()
-            {
-               latch.countDown();
-            }
-
-            public void onError(final int errorCode, final String errorMessage)
-            {
-
-            }
-         }
-
-         final MyIOAsyncTask task = new MyIOAsyncTask();
-
-         class MyRunner implements Runnable
-         {
-            private final ByteBuffer bb1;
-
-            MyRunner()
-            {
-               bb1 = generateBuffer(recordSize, (byte)'h');
-            }
-
-            public void run()
-            {
-               for (int i = 0; i < its; i++)
-               {
-                  bb1.rewind();
-
-                  file.writeDirect(bb1, true, task);
-                  // try
-                  // {
-                  // file.writeDirect(bb1, true);
-                  // }
-                  // catch (Exception e)
-                  // {
-                  // e.printStackTrace();
-                  // }
-               }
-            }
-         }
-
-         Set<Thread> threads = new HashSet<Thread>();
-
-         for (int i = 0; i < numThreads; i++)
-         {
-            MyRunner runner = new MyRunner();
-
-            Thread t = new Thread(runner);
-
-            threads.add(t);
-         }
-
-         long start = System.currentTimeMillis();
-
-         for (Thread t : threads)
-         {
-            ActiveMQJournalLogger.LOGGER.startingThread();
-            t.start();
-         }
-
-         for (Thread t : threads)
-         {
-            t.join();
-         }
-
-         latch.await();
-
-         long end = System.currentTimeMillis();
-
-         double rate = 1000 * (double)its * numThreads / (end - start);
-
-         double throughput = recordSize * rate;
-
-         System.out.println("For " + numThreads + " threads:");
-         System.out.println("Rate of " + rate + " records per sec");
-         System.out.println("Throughput " + throughput + " bytes per sec");
-         System.out.println("*************");
-      }
-   }
-
-   private void write(final ByteBuffer buffer, final FileChannel channel, final int size) throws Exception
-   {
-      buffer.flip();
-
-      channel.write(buffer);
-   }
-
-   private ByteBuffer generateBuffer(final int size, final byte ch)
-   {
-      ByteBuffer bb = ByteBuffer.allocateDirect(size);
-
-      for (int i = 0; i < size; i++)
-      {
-         bb.put(ch);
-      }
-
-      return bb;
-   }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/TimedBuffer.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/TimedBuffer.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/TimedBuffer.java
deleted file mode 100644
index 45d4b62..0000000
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/TimedBuffer.java
+++ /dev/null
@@ -1,558 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.journal.impl;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
-import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
-import org.apache.activemq.artemis.core.journal.EncodingSupport;
-import org.apache.activemq.artemis.core.journal.IOAsyncTask;
-import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
-import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
-
-public class TimedBuffer
-{
-   // Constants -----------------------------------------------------
-
-   // The number of tries on sleep before switching to spin
-   public static final int MAX_CHECKS_ON_SLEEP = 20;
-
-   // Attributes ----------------------------------------------------
-
-   private TimedBufferObserver bufferObserver;
-
-   // If the TimedBuffer is idle - i.e. no records are being added, then it's pointless the timer flush thread
-   // in spinning and checking the time - and using up CPU in the process - this semaphore is used to
-   // prevent that
-   private final Semaphore spinLimiter = new Semaphore(1);
-
-   private CheckTimer timerRunnable = new CheckTimer();
-
-   private final int bufferSize;
-
-   private final ActiveMQBuffer buffer;
-
-   private int bufferLimit = 0;
-
-   private List<IOAsyncTask> callbacks;
-
-   private volatile int timeout;
-
-   // used to measure sync requests. When a sync is requested, it shouldn't take more than timeout to happen
-   private volatile boolean pendingSync = false;
-
-   private Thread timerThread;
-
-   private volatile boolean started;
-
-   // We use this flag to prevent flush occurring between calling checkSize and addBytes
-   // CheckSize must always be followed by it's corresponding addBytes otherwise the buffer
-   // can get in an inconsistent state
-   private boolean delayFlush;
-
-   // for logging write rates
-
-   private final boolean logRates;
-
-   private final AtomicLong bytesFlushed = new AtomicLong(0);
-
-   private final AtomicLong flushesDone = new AtomicLong(0);
-
-   private Timer logRatesTimer;
-
-   private TimerTask logRatesTimerTask;
-
-   private boolean useSleep = true;
-
-   // no need to be volatile as every access is synchronized
-   private boolean spinning = false;
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   public TimedBuffer(final int size, final int timeout, final boolean logRates)
-   {
-      bufferSize = size;
-
-      this.logRates = logRates;
-
-      if (logRates)
-      {
-         logRatesTimer = new Timer(true);
-      }
-      // Setting the interval for nano-sleeps
-
-      buffer = ActiveMQBuffers.fixedBuffer(bufferSize);
-
-      buffer.clear();
-
-      bufferLimit = 0;
-
-      callbacks = new ArrayList<IOAsyncTask>();
-
-      this.timeout = timeout;
-   }
-
-   // for Debug purposes
-   public synchronized boolean isUseSleep()
-   {
-      return useSleep;
-   }
-
-   public synchronized void setUseSleep(boolean useSleep)
-   {
-      this.useSleep = useSleep;
-   }
-
-   public synchronized void start()
-   {
-      if (started)
-      {
-         return;
-      }
-
-      // Need to start with the spin limiter acquired
-      try
-      {
-         spinLimiter.acquire();
-      }
-      catch (InterruptedException e)
-      {
-         throw new ActiveMQInterruptedException(e);
-      }
-
-      timerRunnable = new CheckTimer();
-
-      timerThread = new Thread(timerRunnable, "activemq-buffer-timeout");
-
-      timerThread.start();
-
-      if (logRates)
-      {
-         logRatesTimerTask = new LogRatesTimerTask();
-
-         logRatesTimer.scheduleAtFixedRate(logRatesTimerTask, 2000, 2000);
-      }
-
-      started = true;
-   }
-
-   public void stop()
-   {
-      if (!started)
-      {
-         return;
-      }
-
-      flush();
-
-      bufferObserver = null;
-
-      timerRunnable.close();
-
-      spinLimiter.release();
-
-      if (logRates)
-      {
-         logRatesTimerTask.cancel();
-      }
-
-      while (timerThread.isAlive())
-      {
-         try
-         {
-            timerThread.join();
-         }
-         catch (InterruptedException e)
-         {
-            throw new ActiveMQInterruptedException(e);
-         }
-      }
-
-      started = false;
-   }
-
-   public synchronized void setObserver(final TimedBufferObserver observer)
-   {
-      if (bufferObserver != null)
-      {
-         flush();
-      }
-
-      bufferObserver = observer;
-   }
-
-   /**
-    * Verify if the size fits the buffer
-    *
-    * @param sizeChecked
-    */
-   public synchronized boolean checkSize(final int sizeChecked)
-   {
-      if (!started)
-      {
-         throw new IllegalStateException("TimedBuffer is not started");
-      }
-
-      if (sizeChecked > bufferSize)
-      {
-         throw new IllegalStateException("Can't write records bigger than the bufferSize(" + bufferSize +
-                                            ") on the journal");
-      }
-
-      if (bufferLimit == 0 || buffer.writerIndex() + sizeChecked > bufferLimit)
-      {
-         // Either there is not enough space left in the buffer for the sized record
-         // Or a flush has just been performed and we need to re-calcualate bufferLimit
-
-         flush();
-
-         delayFlush = true;
-
-         final int remainingInFile = bufferObserver.getRemainingBytes();
-
-         if (sizeChecked > remainingInFile)
-         {
-            return false;
-         }
-         else
-         {
-            // There is enough space in the file for this size
-
-            // Need to re-calculate buffer limit
-
-            bufferLimit = Math.min(remainingInFile, bufferSize);
-
-            return true;
-         }
-      }
-      else
-      {
-         delayFlush = true;
-
-         return true;
-      }
-   }
-
-   public synchronized void addBytes(final ActiveMQBuffer bytes, final boolean sync, final IOAsyncTask callback)
-   {
-      addBytes(new ByteArrayEncoding(bytes.toByteBuffer().array()), sync, callback);
-   }
-
-   public synchronized void addBytes(final EncodingSupport bytes, final boolean sync, final IOAsyncTask callback)
-   {
-      if (!started)
-      {
-         throw new IllegalStateException("TimedBuffer is not started");
-      }
-
-      delayFlush = false;
-
-      bytes.encode(buffer);
-
-      callbacks.add(callback);
-
-      if (sync)
-      {
-         pendingSync = true;
-
-         startSpin();
-      }
-
-   }
-
-   public void flush()
-   {
-      flush(false);
-   }
-
-   /**
-    * force means the Journal is moving to a new file. Any pending write need to be done immediately
-    * or data could be lost
-    */
-   public void flush(final boolean force)
-   {
-      synchronized (this)
-      {
-         if (!started)
-         {
-            throw new IllegalStateException("TimedBuffer is not started");
-         }
-
-         if ((force || !delayFlush) && buffer.writerIndex() > 0)
-         {
-            int pos = buffer.writerIndex();
-
-            if (logRates)
-            {
-               bytesFlushed.addAndGet(pos);
-            }
-
-            ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos);
-
-            // Putting a byteArray on a native buffer is much faster, since it will do in a single native call.
-            // Using bufferToFlush.put(buffer) would make several append calls for each byte
-            // We also transfer the content of this buffer to the native file's buffer
-
-            bufferToFlush.put(buffer.toByteBuffer().array(), 0, pos);
-
-            bufferObserver.flushBuffer(bufferToFlush, pendingSync, callbacks);
-
-            stopSpin();
-
-            pendingSync = false;
-
-            // swap the instance as the previous callback list is being used asynchronously
-            callbacks = new LinkedList<IOAsyncTask>();
-
-            buffer.clear();
-
-            bufferLimit = 0;
-
-            flushesDone.incrementAndGet();
-         }
-      }
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
-   private class LogRatesTimerTask extends TimerTask
-   {
-      private boolean closed;
-
-      private long lastExecution;
-
-      private long lastBytesFlushed;
-
-      private long lastFlushesDone;
-
-      @Override
-      public synchronized void run()
-      {
-         if (!closed)
-         {
-            long now = System.currentTimeMillis();
-
-            long bytesF = bytesFlushed.get();
-            long flushesD = flushesDone.get();
-
-            if (lastExecution != 0)
-            {
-               double rate = 1000 * (double) (bytesF - lastBytesFlushed) / (now - lastExecution);
-               ActiveMQJournalLogger.LOGGER.writeRate(rate, (long) (rate / (1024 * 1024)));
-               double flushRate = 1000 * (double) (flushesD - lastFlushesDone) / (now - lastExecution);
-               ActiveMQJournalLogger.LOGGER.flushRate(flushRate);
-            }
-
-            lastExecution = now;
-
-            lastBytesFlushed = bytesF;
-
-            lastFlushesDone = flushesD;
-         }
-      }
-
-      @Override
-      public synchronized boolean cancel()
-      {
-         closed = true;
-
-         return super.cancel();
-      }
-   }
-
-   private class CheckTimer implements Runnable
-   {
-      private volatile boolean closed = false;
-
-      int checks = 0;
-      int failedChecks = 0;
-      long timeBefore = 0;
-
-      final int sleepMillis = timeout / 1000000; // truncates
-      final int sleepNanos = timeout % 1000000;
-
-
-      public void run()
-      {
-         long lastFlushTime = 0;
-
-         while (!closed)
-         {
-            // We flush on the timer if there are pending syncs there and we've waited at least one
-            // timeout since the time of the last flush.
-            // Effectively flushing "resets" the timer
-            // On the timeout verification, notice that we ignore the timeout check if we are using sleep
-
-            if (pendingSync)
-            {
-               if (isUseSleep())
-               {
-                  // if using sleep, we will always flush
-                  flush();
-                  lastFlushTime = System.nanoTime();
-               }
-               else if (bufferObserver != null && System.nanoTime() > lastFlushTime + timeout)
-               {
-                  // if not using flush we will spin and do the time checks manually
-                  flush();
-                  lastFlushTime = System.nanoTime();
-               }
-
-            }
-
-            sleepIfPossible();
-
-            try
-            {
-               spinLimiter.acquire();
-
-               Thread.yield();
-
-               spinLimiter.release();
-            }
-            catch (InterruptedException e)
-            {
-               throw new ActiveMQInterruptedException(e);
-            }
-         }
-      }
-
-      /**
-       * We will attempt to use sleep only if the system supports nano-sleep
-       * we will on that case verify up to MAX_CHECKS if nano sleep is behaving well.
-       * if more than 50% of the checks have failed we will cancel the sleep and just use regular spin
-       */
-      private void sleepIfPossible()
-      {
-         if (isUseSleep())
-         {
-            if (checks < MAX_CHECKS_ON_SLEEP)
-            {
-               timeBefore = System.nanoTime();
-            }
-
-            try
-            {
-               sleep(sleepMillis, sleepNanos);
-            }
-            catch (InterruptedException e)
-            {
-               throw new ActiveMQInterruptedException(e);
-            }
-            catch (Exception e)
-            {
-               setUseSleep(false);
-               ActiveMQJournalLogger.LOGGER.warn(e.getMessage() + ", disabling sleep on TimedBuffer, using spin now", e);
-            }
-
-            if (checks < MAX_CHECKS_ON_SLEEP)
-            {
-               long realTimeSleep = System.nanoTime() - timeBefore;
-
-               // I'm letting the real time to be up to 50% than the requested sleep.
-               if (realTimeSleep > timeout * 1.5)
-               {
-                  failedChecks++;
-               }
-
-               if (++checks >= MAX_CHECKS_ON_SLEEP)
-               {
-                  if (failedChecks > MAX_CHECKS_ON_SLEEP * 0.5)
-                  {
-                     ActiveMQJournalLogger.LOGGER.debug("Thread.sleep with nano seconds is not working as expected, Your kernel possibly doesn't support real time. the Journal TimedBuffer will spin for timeouts");
-                     setUseSleep(false);
-                  }
-               }
-            }
-         }
-      }
-
-      public void close()
-      {
-         closed = true;
-      }
-   }
-
-   /**
-    * Sub classes (tests basically) can use this to override how the sleep is being done
-    *
-    * @param sleepMillis
-    * @param sleepNanos
-    * @throws InterruptedException
-    */
-   protected void sleep(int sleepMillis, int sleepNanos) throws InterruptedException
-   {
-      Thread.sleep(sleepMillis, sleepNanos);
-   }
-
-   /**
-    * Sub classes (tests basically) can use this to override disabling spinning
-    */
-   protected void stopSpin()
-   {
-      if (spinning)
-      {
-         try
-         {
-            // We acquire the spinLimiter semaphore - this prevents the timer flush thread unnecessarily spinning
-            // when the buffer is inactive
-            spinLimiter.acquire();
-         }
-         catch (InterruptedException e)
-         {
-            throw new ActiveMQInterruptedException(e);
-         }
-
-         spinning = false;
-      }
-   }
-
-
-   /**
-    * Sub classes (tests basically) can use this to override disabling spinning
-    */
-   protected void startSpin()
-   {
-      if (!spinning)
-      {
-         spinLimiter.release();
-
-         spinning = true;
-      }
-   }
-
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/TimedBufferObserver.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/TimedBufferObserver.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/TimedBufferObserver.java
deleted file mode 100644
index f219f08..0000000
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/TimedBufferObserver.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.journal.impl;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.apache.activemq.artemis.core.journal.IOAsyncTask;
-
-public interface TimedBufferObserver
-{
-
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   void flushBuffer(ByteBuffer buffer, boolean syncRequested, List<IOAsyncTask> callbacks);
-
-   /** Return the number of remaining bytes that still fit on the observer (file) */
-   int getRemainingBytes();
-
-   ByteBuffer newBuffer(int size, int limit);
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/TransactionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/TransactionCallback.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/TransactionCallback.java
index 4035202..140927e 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/TransactionCallback.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/TransactionCallback.java
@@ -18,10 +18,10 @@ package org.apache.activemq.artemis.core.journal.impl;
 
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.activemq.artemis.core.journal.IOAsyncTask;
+import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.utils.ReusableLatch;
 
-public class TransactionCallback implements IOAsyncTask
+public class TransactionCallback implements IOCallback
 {
    private final ReusableLatch countLatch = new ReusableLatch();
 
@@ -33,7 +33,7 @@ public class TransactionCallback implements IOAsyncTask
 
    private int done = 0;
 
-   private volatile IOAsyncTask delegateCompletion;
+   private volatile IOCallback delegateCompletion;
 
    public void countUp()
    {
@@ -46,7 +46,7 @@ public class TransactionCallback implements IOAsyncTask
       countLatch.countDown();
       if (++done == up.get() && delegateCompletion != null)
       {
-         final IOAsyncTask delegateToCall = delegateCompletion;
+         final IOCallback delegateToCall = delegateCompletion;
          // We need to set the delegateCompletion to null first or blocking commits could miss a callback
          // What would affect mainly tests
          delegateCompletion = null;
@@ -81,7 +81,7 @@ public class TransactionCallback implements IOAsyncTask
    /**
     * @return the delegateCompletion
     */
-   public IOAsyncTask getDelegateCompletion()
+   public IOCallback getDelegateCompletion()
    {
       return delegateCompletion;
    }
@@ -89,7 +89,7 @@ public class TransactionCallback implements IOAsyncTask
    /**
     * @param delegateCompletion the delegateCompletion to set
     */
-   public void setDelegateCompletion(final IOAsyncTask delegateCompletion)
+   public void setDelegateCompletion(final IOCallback delegateCompletion)
    {
       this.delegateCompletion = delegateCompletion;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/aio/CallbackOrderTest.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/aio/CallbackOrderTest.java b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/aio/CallbackOrderTest.java
new file mode 100644
index 0000000..82d4502
--- /dev/null
+++ b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/aio/CallbackOrderTest.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.io.aio;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.core.io.IOCallback;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/** This will emulate callbacks out of order from libaio*/
+public class CallbackOrderTest
+{
+
+   @Rule
+   public TemporaryFolder temporaryFolder;
+
+   public CallbackOrderTest()
+   {
+      File parent = new File("./target");
+      parent.mkdirs();
+      temporaryFolder = new TemporaryFolder(parent);
+   }
+
+   /** This method will make sure callbacks will come back in order even when out order from libaio */
+   @Test
+   public void testCallbackOutOfOrder() throws Exception
+   {
+      AIOSequentialFileFactory factory = new AIOSequentialFileFactory(temporaryFolder.getRoot(), 100);
+      AIOSequentialFile file = (AIOSequentialFile)factory.createSequentialFile("test.bin");
+
+      final AtomicInteger count = new AtomicInteger(0);
+
+      IOCallback callback = new IOCallback()
+      {
+         @Override
+         public void done()
+         {
+            count.incrementAndGet();
+         }
+
+         @Override
+         public void onError(int errorCode, String errorMessage)
+         {
+
+         }
+      };
+
+      ArrayList<AIOSequentialFileFactory.AIOSequentialCallback> list = new ArrayList<>();
+
+      // We will repeat the teset a few times, increasing N
+      // to increase possibility of issues due to reuse of callbacks
+      for (int n = 1; n < 100; n++)
+      {
+         System.out.println("n = " + n);
+         int N = n;
+         count.set(0);
+         list.clear();
+         for (int i = 0; i < N; i++)
+         {
+            list.add(file.getCallback(callback, null));
+         }
+
+
+         for (int i = N - 1; i >= 0; i--)
+         {
+            list.get(i).done();
+         }
+
+         Assert.assertEquals(N, count.get());
+         Assert.assertEquals(0, file.pendingCallbackList.size());
+         Assert.assertTrue(file.pendingCallbackList.isEmpty());
+      }
+
+      factory.stop();
+
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-maven-plugin/src/main/java/org/apache/activemq/artemis/maven/ActiveMQCreatePlugin.java
----------------------------------------------------------------------
diff --git a/artemis-maven-plugin/src/main/java/org/apache/activemq/artemis/maven/ActiveMQCreatePlugin.java b/artemis-maven-plugin/src/main/java/org/apache/activemq/artemis/maven/ActiveMQCreatePlugin.java
index 78154b6..bae871e 100644
--- a/artemis-maven-plugin/src/main/java/org/apache/activemq/artemis/maven/ActiveMQCreatePlugin.java
+++ b/artemis-maven-plugin/src/main/java/org/apache/activemq/artemis/maven/ActiveMQCreatePlugin.java
@@ -250,6 +250,7 @@ public class ActiveMQCreatePlugin extends AbstractMojo
          add(listCommands, "--failover-on-shutdown");
       }
 
+      add(listCommands, "--no-sync-test");
       add(listCommands, "--verbose");
 
       add(listCommands, instance.getAbsolutePath());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/bin/libartemis-native-32.so
----------------------------------------------------------------------
diff --git a/artemis-native/bin/libartemis-native-32.so b/artemis-native/bin/libartemis-native-32.so
index 7178069..df4b560 100755
Binary files a/artemis-native/bin/libartemis-native-32.so and b/artemis-native/bin/libartemis-native-32.so differ

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/bin/libartemis-native-64.so
----------------------------------------------------------------------
diff --git a/artemis-native/bin/libartemis-native-64.so b/artemis-native/bin/libartemis-native-64.so
index 1c4983c..aec757a 100755
Binary files a/artemis-native/bin/libartemis-native-64.so and b/artemis-native/bin/libartemis-native-64.so differ

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/pom.xml
----------------------------------------------------------------------
diff --git a/artemis-native/pom.xml b/artemis-native/pom.xml
index abd5f0d..0206166 100644
--- a/artemis-native/pom.xml
+++ b/artemis-native/pom.xml
@@ -32,6 +32,30 @@
          <artifactId>artemis-commons</artifactId>
          <version>${project.version}</version>
       </dependency>
+
+      <dependency>
+         <groupId>org.jboss.logging</groupId>
+         <artifactId>jboss-logging-processor</artifactId>
+         <scope>provided</scope>
+         <optional>true</optional>
+      </dependency>
+
+      <dependency>
+         <groupId>org.jboss.logging</groupId>
+         <artifactId>jboss-logging</artifactId>
+      </dependency>
+      <dependency>
+         <groupId>org.jboss.logmanager</groupId>
+         <artifactId>jboss-logmanager</artifactId>
+         <scope>test</scope>
+      </dependency>
+
+      <dependency>
+         <groupId>junit</groupId>
+         <artifactId>junit</artifactId>
+         <scope>test</scope>
+      </dependency>
+
    </dependencies>
 
    <build>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/c/AIOController.cpp
----------------------------------------------------------------------
diff --git a/artemis-native/src/main/c/AIOController.cpp b/artemis-native/src/main/c/AIOController.cpp
deleted file mode 100644
index a61bf04..0000000
--- a/artemis-native/src/main/c/AIOController.cpp
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements. See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License. You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-
-#include <string>
-#include "AIOController.h"
-#include "JavaUtilities.h"
-#include "JAIODatatypes.h"
-
-AIOController::AIOController(std::string fileName, int maxIO) : logger(0), fileOutput(fileName, this, maxIO) 
-{
-}
-
-void AIOController::log(THREAD_CONTEXT threadContext, short level, const char * message)
-{
-	jmethodID methodID = 0;
-	
-	switch (level)
-	{
-	case 0: methodID = loggerError; break;
-	case 1: methodID = loggerWarn; break;
-	case 2: methodID = loggerInfo; break;
-	case 3: methodID = loggerDebug; break;
-	default: methodID = loggerDebug; break;
-	}
-
-#ifdef DEBUG
-	fprintf (stderr,"Callig log methodID=%ld, message=%s, logger=%ld, threadContext = %ld\n", (long) methodID, message, (long) logger, (long) threadContext); fflush(stderr);
-#endif
-	threadContext->CallVoidMethod(logger,methodID,threadContext->NewStringUTF(message));
-}
-
-
-void AIOController::destroy(THREAD_CONTEXT context)
-{
-	if (logger != 0)
-	{
-		context->DeleteGlobalRef(logger);
-	}
-}
-
-/*
- * level = 0-error, 1-warn, 2-info, 3-debug
- */
-
-
-AIOController::~AIOController()
-{
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/c/AIOController.h
----------------------------------------------------------------------
diff --git a/artemis-native/src/main/c/AIOController.h b/artemis-native/src/main/c/AIOController.h
deleted file mode 100644
index 913565f..0000000
--- a/artemis-native/src/main/c/AIOController.h
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements. See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License. You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-
-#ifndef AIOCONTROLLER_H_
-#define AIOCONTROLLER_H_
-#include <jni.h>
-#include <string>
-#include "JAIODatatypes.h"
-#include "AsyncFile.h"
-
-class AIOController
-{
-public:
-	jmethodID done;
-	jmethodID error;
-
-	jobject logger;
-	
-	jmethodID loggerError;
-	jmethodID loggerWarn;
-	jmethodID loggerDebug;
-	jmethodID loggerInfo;
-
-	/*
-	 * level = 0-error, 1-warn, 2-info, 3-debug
-	 */
-	void log(THREAD_CONTEXT threadContext, short level, const char * message);
-	
-	AsyncFile fileOutput;
-	
-	void destroy(THREAD_CONTEXT context);
-	
-	AIOController(std::string fileName, int maxIO);
-	virtual ~AIOController();
-};
-#endif /*AIOCONTROLLER_H_*/

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/c/AIOException.h
----------------------------------------------------------------------
diff --git a/artemis-native/src/main/c/AIOException.h b/artemis-native/src/main/c/AIOException.h
deleted file mode 100644
index 98745a2..0000000
--- a/artemis-native/src/main/c/AIOException.h
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements. See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License. You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-
-
-#ifndef AIOEXCEPTION_H_
-#define AIOEXCEPTION_H_
-
-#include <exception>
-#include <string>
-
-
-#define NATIVE_ERROR_INTERNAL 200
-#define NATIVE_ERROR_INVALID_BUFFER 201
-#define NATIVE_ERROR_NOT_ALIGNED 202
-#define NATIVE_ERROR_CANT_INITIALIZE_AIO 203
-#define NATIVE_ERROR_CANT_RELEASE_AIO 204
-#define NATIVE_ERROR_CANT_OPEN_CLOSE_FILE 205
-#define NATIVE_ERROR_CANT_ALLOCATE_QUEUE 206
-#define NATIVE_ERROR_PREALLOCATE_FILE 208
-#define NATIVE_ERROR_ALLOCATE_MEMORY 209
-#define NATIVE_ERROR_IO 006
-#define NATIVE_ERROR_AIO_FULL 211
-
-
-class AIOException : public std::exception
-{
-private:
-	int errorCode;
-	std::string message;
-public:
-	AIOException(int _errorCode, std::string  _message) throw() : errorCode(_errorCode), message(_message)
-	{
-		errorCode = _errorCode;
-		message = _message;
-	}
-	
-	AIOException(int _errorCode, const char * _message) throw ()
-	{
-		message = std::string(_message);
-		errorCode = _errorCode;
-	}
-	
-	virtual ~AIOException() throw()
-	{
-		
-	}
-	
-	int inline getErrorCode()
-	{
-		return errorCode;
-	}
-	
-    const char* what() const throw()
-    {
-    	return message.data();
-    }
-	
-};
-
-#endif /*AIOEXCEPTION_H_*/

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/c/AsyncFile.cpp
----------------------------------------------------------------------
diff --git a/artemis-native/src/main/c/AsyncFile.cpp b/artemis-native/src/main/c/AsyncFile.cpp
deleted file mode 100644
index 2385a0d..0000000
--- a/artemis-native/src/main/c/AsyncFile.cpp
+++ /dev/null
@@ -1,348 +0,0 @@
-/*
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements. See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License. You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-#ifndef _GNU_SOURCE
-#define _GNU_SOURCE
-#endif
-
-
-#include <stdlib.h>
-#include <list>
-#include <iostream>
-#include <sstream>
-#include <memory.h>
-#include <errno.h>
-#include <libaio.h>
-#include <fcntl.h>
-#include <unistd.h>
-#include <sys/stat.h>
-#include "AsyncFile.h"
-#include "AIOController.h"
-#include "AIOException.h"
-#include "pthread.h"
-#include "LockClass.h"
-#include "CallbackAdapter.h"
-#include "LockClass.h"
-
-//#define DEBUG
-
-#define WAIT_FOR_SPOT 10000
-#define TRIES_BEFORE_WARN 0
-#define TRIES_BEFORE_ERROR 500
-
-
-std::string io_error(int rc)
-{
-	std::stringstream buffer;
-
-	if (rc == -ENOSYS)
-		buffer << "AIO not in this kernel";
-	else
-		buffer << "Error:= " << strerror((int)-rc);
-
-	return buffer.str();
-}
-
-
-AsyncFile::AsyncFile(std::string & _fileName, AIOController * _controller, int _maxIO) : aioContext(0), events(0), fileHandle(0), controller(_controller), pollerRunning(0)
-{
-	::pthread_mutex_init(&fileMutex,0);
-	::pthread_mutex_init(&pollerMutex,0);
-
-	maxIO = _maxIO;
-	fileName = _fileName;
-	if (io_queue_init(maxIO, &aioContext))
-	{
-		throw AIOException(NATIVE_ERROR_CANT_INITIALIZE_AIO, "Can't initialize aio, out of AIO Handlers");
-	}
-
-	fileHandle = ::open(fileName.data(),  O_RDWR | O_CREAT | O_DIRECT, 0666);
-	if (fileHandle < 0)
-	{
-		io_queue_release(aioContext);
-		throw AIOException(NATIVE_ERROR_CANT_OPEN_CLOSE_FILE, "Can't open file");
-	}
-
-#ifdef DEBUG
-	fprintf (stderr,"File Handle %d", fileHandle);
-#endif
-
-	events = (struct io_event *)malloc (maxIO * sizeof (struct io_event));
-
-	if (events == 0)
-	{
-		throw AIOException (NATIVE_ERROR_CANT_ALLOCATE_QUEUE, "Can't allocate ioEvents");
-	}
-
-}
-
-AsyncFile::~AsyncFile()
-{
-	if (io_queue_release(aioContext))
-	{
-		throw AIOException(NATIVE_ERROR_CANT_RELEASE_AIO,"Can't release aio");
-	}
-	if (::close(fileHandle))
-	{
-		throw AIOException(NATIVE_ERROR_CANT_OPEN_CLOSE_FILE,"Can't close file");
-	}
-	free(events);
-	::pthread_mutex_destroy(&fileMutex);
-	::pthread_mutex_destroy(&pollerMutex);
-}
-
-int isException (THREAD_CONTEXT threadContext)
-{
-	return JNI_ENV(threadContext)->ExceptionOccurred() != 0;
-}
-
-void AsyncFile::pollEvents(THREAD_CONTEXT threadContext)
-{
-
-	LockClass lock(&pollerMutex);
-	pollerRunning=1;
-
-
-	while (pollerRunning)
-	{
-		if (isException(threadContext))
-		{
-			return;
-		}
-		int result = io_getevents(this->aioContext, 1, maxIO, events, 0);
-
-
-#ifdef DEBUG
-		fprintf (stderr, "poll, pollerRunning=%d\n", pollerRunning); fflush(stderr);
-#endif
-
-		if (result > 0)
-		{
-
-#ifdef DEBUG
-			fprintf (stdout, "Received %d events\n", result);
-			fflush(stdout);
-#endif
-		}
-
-		for (int i=0; i<result; i++)
-		{
-
-			struct iocb * iocbp = events[i].obj;
-
-			if (iocbp->data == (void *) -1)
-			{
-				pollerRunning = 0;
-#ifdef DEBUG
-				controller->log(threadContext, 2, "Received poller request to stop");
-#endif
-			}
-			else
-			{
-				CallbackAdapter * adapter = (CallbackAdapter *) iocbp->data;
-
-				long result = events[i].res;
-				if (result < 0)
-				{
-					std::string strerror = io_error((int)result);
-					adapter->onError(threadContext, result, strerror);
-				}
-				else
-				{
-					adapter->done(threadContext);
-				}
-			}
-
-			delete iocbp;
-		}
-	}
-#ifdef DEBUG
-	controller->log(threadContext, 2, "Poller finished execution");
-#endif
-}
-
-
-void AsyncFile::preAllocate(THREAD_CONTEXT , off_t position, int blocks, size_t size, int fillChar)
-{
-
-	if (size % ALIGNMENT != 0)
-	{
-		throw AIOException (NATIVE_ERROR_PREALLOCATE_FILE, "You can only pre allocate files in multiples of 512");
-	}
-
-	void * preAllocBuffer = 0;
-	if (posix_memalign(&preAllocBuffer, 512, size))
-	{
-		throw AIOException(NATIVE_ERROR_ALLOCATE_MEMORY, "Error on posix_memalign");
-	}
-
-	memset(preAllocBuffer, fillChar, size);
-
-
-	if (::lseek (fileHandle, position, SEEK_SET) < 0) throw AIOException (11, "Error positioning the file");
-
-	for (int i=0; i<blocks; i++)
-	{
-		if (::write(fileHandle, preAllocBuffer, size)<0)
-		{
-			throw AIOException (NATIVE_ERROR_PREALLOCATE_FILE, "Error pre allocating the file");
-		}
-	}
-
-	if (::lseek (fileHandle, position, SEEK_SET) < 0) throw AIOException (NATIVE_ERROR_IO, "Error positioning the file");
-
-	free (preAllocBuffer);
-}
-
-
-/** Write directly to the file without using libaio queue */
-void AsyncFile::writeInternal(THREAD_CONTEXT, long position, size_t size, void *& buffer)
-{
-	if (::lseek (fileHandle, position, SEEK_SET) < 0) throw AIOException (11, "Error positioning the file");
-
-	if (::write(fileHandle, buffer, size)<0)
-	{
-		throw AIOException (NATIVE_ERROR_IO, "Error writing file");
-	}
-	
-	if (::fsync(fileHandle) < 0)
-	{
-		throw AIOException (NATIVE_ERROR_IO, "Error on synchronizing file");
-	}
-	
-
-}
-
-
-void AsyncFile::write(THREAD_CONTEXT threadContext, long position, size_t size, void *& buffer, CallbackAdapter *& adapter)
-{
-
-	struct iocb * iocb = new struct iocb();
-	::io_prep_pwrite(iocb, fileHandle, buffer, size, position);
-	iocb->data = (void *) adapter;
-
-	int tries = 0;
-	int result = 0;
-
-	while ((result = ::io_submit(aioContext, 1, &iocb)) == (-EAGAIN))
-	{
-#ifdef DEBUG
-		fprintf (stderr, "Retrying block as iocb was full (retry=%d)\n", tries);
-#endif
-		tries ++;
-		if (tries > TRIES_BEFORE_WARN)
-		{
-#ifdef DEBUG
-		    fprintf (stderr, "Warning level on retries, informing logger (retry=%d)\n", tries);
-#endif
-			controller->log(threadContext, 1, "You should consider expanding AIOLimit if this message appears too many times");
-		}
-
-		if (tries > TRIES_BEFORE_ERROR)
-		{
-#ifdef DEBUG
-		    fprintf (stderr, "Error level on retries, throwing exception (retry=%d)\n", tries);
-#endif
-			throw AIOException(NATIVE_ERROR_AIO_FULL, "Too many retries (500) waiting for a valid iocb block, please increase MAX_IO limit");
-		}
-		::usleep(WAIT_FOR_SPOT);
-	}
-
-	if (result<0)
-	{
-		std::stringstream str;
-		str<< "Problem on submit block, errorCode=" << result;
-		throw AIOException (NATIVE_ERROR_IO, str.str());
-	}
-}
-
-void AsyncFile::read(THREAD_CONTEXT threadContext, long position, size_t size, void *& buffer, CallbackAdapter *& adapter)
-{
-
-	struct iocb * iocb = new struct iocb();
-	::io_prep_pread(iocb, fileHandle, buffer, size, position);
-	iocb->data = (void *) adapter;
-
-	int tries = 0;
-	int result = 0;
-
-	while ((result = ::io_submit(aioContext, 1, &iocb)) == (-EAGAIN))
-	{
-#ifdef DEBUG
-		fprintf (stderr, "Retrying block as iocb was full (retry=%d)\n", tries);
-#endif
-		tries ++;
-		if (tries > TRIES_BEFORE_WARN)
-		{
-#ifdef DEBUG
-		    fprintf (stderr, "Warning level on retries, informing logger (retry=%d)\n", tries);
-#endif
-			controller->log(threadContext, 1, "You should consider expanding AIOLimit if this message appears too many times");
-		}
-
-		if (tries > TRIES_BEFORE_ERROR)
-		{
-#ifdef DEBUG
-		    fprintf (stderr, "Error level on retries, throwing exception (retry=%d)\n", tries);
-#endif
-			throw AIOException(NATIVE_ERROR_AIO_FULL, "Too many retries (500) waiting for a valid iocb block, please increase MAX_IO limit");
-		}
-		::usleep(WAIT_FOR_SPOT);
-	}
-
-	if (result<0)
-	{
-		std::stringstream str;
-		str<< "Problem on submit block, errorCode=" << result;
-		throw AIOException (NATIVE_ERROR_IO, str.str());
-	}
-}
-
-long AsyncFile::getSize()
-{
-	struct stat statBuffer;
-
-	if (fstat(fileHandle, &statBuffer) < 0)
-	{
-		return -1l;
-	}
-	return statBuffer.st_size;
-}
-
-
-void AsyncFile::stopPoller(THREAD_CONTEXT threadContext)
-{
-	pollerRunning = 0;
-
-
-	struct iocb * iocb = new struct iocb();
-	::io_prep_pwrite(iocb, fileHandle, 0, 0, 0);
-	iocb->data = (void *) -1;
-
-	int result = 0;
-
-	while ((result = ::io_submit(aioContext, 1, &iocb)) == (-EAGAIN))
-	{
-		fprintf(stderr, "Couldn't send request to stop poller, trying again");
-		controller->log(threadContext, 1, "Couldn't send request to stop poller, trying again");
-		::usleep(WAIT_FOR_SPOT);
-	}
-
-	// Waiting the Poller to finish (by giving up the lock)
-	LockClass lock(&pollerMutex);
-}
-

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/c/AsyncFile.h
----------------------------------------------------------------------
diff --git a/artemis-native/src/main/c/AsyncFile.h b/artemis-native/src/main/c/AsyncFile.h
deleted file mode 100644
index 71281c9..0000000
--- a/artemis-native/src/main/c/AsyncFile.h
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements. See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License. You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-#ifndef FILEOUTPUT_H_
-#define FILEOUTPUT_H_
-
-#include <string>
-#include <libaio.h>
-#include <stdlib.h>
-#include <pthread.h>
-#include "JAIODatatypes.h"
-#include "AIOException.h"
-
-class AIOController;
-
-class CallbackAdapter;
-
-/** Author: Clebert Suconic at Redhat dot com*/
-class AsyncFile
-{
-private:
-	io_context_t aioContext;
-	struct io_event *events; 
-	int fileHandle;
-	std::string fileName;
-	
-	pthread_mutex_t fileMutex;
-	pthread_mutex_t pollerMutex;
-	
-	AIOController * controller;
-	
-	bool pollerRunning;
-	
-	int maxIO;
-	
-public:
-	AsyncFile(std::string & _fileName, AIOController * controller, int maxIO);
-	virtual ~AsyncFile();
-	
-	void write(THREAD_CONTEXT threadContext, long position, size_t size, void *& buffer, CallbackAdapter *& adapter);
-	
-	/** Write directly to the file without using libaio queue */
-	void writeInternal(THREAD_CONTEXT threadContext, long position, size_t size, void *& buffer);
-	
-	void read(THREAD_CONTEXT threadContext, long position, size_t size, void *& buffer, CallbackAdapter *& adapter);
-	
-	int getHandle()
-	{
-		return fileHandle;
-	}
-
-	long getSize();
-
-	inline void * newBuffer(int size)
-	{
-		void * buffer = 0;
-		if (::posix_memalign(&buffer, 512, size))
-		{
-			throw AIOException(NATIVE_ERROR_ALLOCATE_MEMORY, "Error on posix_memalign");
-		}
-		return buffer;
-		
-	}
-
-	inline void destroyBuffer(void * buffer)
-	{
-		::free(buffer);
-	}
-
-	
-	// Finishes the polling thread (if any) and return
-	void stopPoller(THREAD_CONTEXT threadContext);
-	void preAllocate(THREAD_CONTEXT threadContext, off_t position, int blocks, size_t size, int fillChar);
-	
-	void pollEvents(THREAD_CONTEXT threadContext);
-	
-};
-
-#endif /*FILEOUTPUT_H_*/

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/c/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/artemis-native/src/main/c/CMakeLists.txt b/artemis-native/src/main/c/CMakeLists.txt
index 0a45fd6..beef8da 100644
--- a/artemis-native/src/main/c/CMakeLists.txt
+++ b/artemis-native/src/main/c/CMakeLists.txt
@@ -30,35 +30,21 @@ endif()
 
 # you may want to remove this next line for debugging
 # -O3 would make inline debug hard
+#ADD_DEFINITIONS("-O3 -Wall -z execstack")
 ADD_DEFINITIONS("-O3 -Wall")
-#ADD_DEFINITIONS("-fdump-tree-all -Wall -pg -g -mstack-protector-guard=guard")
+#ADD_DEFINITIONS("-fdump-tree-all -Wall -pg -g")
 
 find_library(LIBAIO NAMES aio)
 
 INCLUDE_DIRECTORIES(. ${JNI_INCLUDE_DIRS})
 
 ADD_CUSTOM_COMMAND(
-    OUTPUT org_apache_activemq_artemis_core_libaio_Native.h
-    COMMAND javah -cp ../java/ org.apache.activemq.artemis.core.libaio.Native
-    DEPENDS ../java/org/apache/activemq/artemis/core/libaio/Native.java
+    OUTPUT org_apache_activemq_artemis_jlibaio_LibaioContext.h
+    COMMAND javah -cp ../java/ org.apache.activemq.artemis.jlibaio.LibaioContext
+    DEPENDS ../java/org/apache/activemq/artemis/jlibaio/LibaioContext.java
 )
 
-ADD_LIBRARY(artemis-native SHARED
-                AIOController.cpp
-                AIOController.h
-                AIOException.h
-                AsyncFile.cpp
-                AsyncFile.h
-                CallbackAdapter.h
-                JAIODatatypes.h
-                JavaUtilities.cpp
-                JavaUtilities.h
-                JNI_AsynchronousFileImpl.cpp
-                JNICallbackAdapter.cpp
-                JNICallbackAdapter.h
-                LockClass.h
-                Version.h
-                org_apache_activemq_artemis_core_libaio_Native.h)
+ADD_LIBRARY(artemis-native SHARED org_apache_activemq_artemis_jlibaio_LibaioContext.c org_apache_activemq_artemis_jlibaio_LibaioContext.h exception_helper.h)
 
 target_link_libraries(artemis-native aio)
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/c/CallbackAdapter.h
----------------------------------------------------------------------
diff --git a/artemis-native/src/main/c/CallbackAdapter.h b/artemis-native/src/main/c/CallbackAdapter.h
deleted file mode 100644
index e8b67da..0000000
--- a/artemis-native/src/main/c/CallbackAdapter.h
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements. See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License. You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-#ifndef BUFFERADAPTER_H_
-#define BUFFERADAPTER_H_
-
-#include <iostream>
-
-#include "JAIODatatypes.h"
-
-class CallbackAdapter
-{
-private:
-
-public:
-	CallbackAdapter()
-	{
-		
-	}
-	virtual ~CallbackAdapter()
-	{
-		
-	}
-	
-	virtual void done(THREAD_CONTEXT ) = 0;
-	virtual void onError(THREAD_CONTEXT , long , std::string )=0;
-};
-#endif /*BUFFERADAPTER_H_*/

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/c/JAIODatatypes.h
----------------------------------------------------------------------
diff --git a/artemis-native/src/main/c/JAIODatatypes.h b/artemis-native/src/main/c/JAIODatatypes.h
deleted file mode 100644
index b611c2b..0000000
--- a/artemis-native/src/main/c/JAIODatatypes.h
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements. See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License. You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-#ifndef JAIODATATYPES_H_
-#define JAIODATATYPES_H_
-
-#include <jni.h>
-
-#define THREAD_CONTEXT JNIEnv *&
-#define JNI_ENV(pointer) pointer 
-#define ALIGNMENT 512
-
-
-#endif /*JAIODATATYPES_H_*/

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/c/JNICallbackAdapter.cpp
----------------------------------------------------------------------
diff --git a/artemis-native/src/main/c/JNICallbackAdapter.cpp b/artemis-native/src/main/c/JNICallbackAdapter.cpp
deleted file mode 100644
index 0f4cef4..0000000
--- a/artemis-native/src/main/c/JNICallbackAdapter.cpp
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements. See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License. You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-#include <jni.h>
-#include "JNICallbackAdapter.h"
-#include <iostream>
-#include "JavaUtilities.h"
-
-jobject nullObj = NULL;
-
-JNICallbackAdapter::JNICallbackAdapter(AIOController * _controller, jlong _sequence, jobject _callback, jobject _fileController, jobject _bufferReference, short _isRead) : CallbackAdapter()
-{
-	controller = _controller;
-
-	sequence = _sequence;
-
-	callback = _callback;
-
-	fileController = _fileController;
-
-	bufferReference = _bufferReference;
-
-	isRead = _isRead;
-
-}
-
-JNICallbackAdapter::~JNICallbackAdapter()
-{
-}
-
-void JNICallbackAdapter::done(THREAD_CONTEXT threadContext)
-{
-	JNI_ENV(threadContext)->CallVoidMethod(fileController, controller->done, callback,  sequence, isRead ? nullObj : bufferReference); 
-
-	release(threadContext);
-}
-
-void JNICallbackAdapter::onError(THREAD_CONTEXT threadContext, long errorCode, std::string error)
-{
-	controller->log(threadContext, 0, "Libaio event generated errors, callback object was informed about it");
-
-	jstring strError = JNI_ENV(threadContext)->NewStringUTF(error.data());
-
-	JNI_ENV(threadContext)->CallVoidMethod(fileController, controller->error, callback, sequence, isRead ? nullObj : bufferReference, (jint)errorCode, strError);
-
-	release(threadContext);
-}
-

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/c/JNICallbackAdapter.h
----------------------------------------------------------------------
diff --git a/artemis-native/src/main/c/JNICallbackAdapter.h b/artemis-native/src/main/c/JNICallbackAdapter.h
deleted file mode 100644
index 5d32620..0000000
--- a/artemis-native/src/main/c/JNICallbackAdapter.h
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements. See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License. You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-#ifndef JNIBUFFERADAPTER_H_
-#define JNIBUFFERADAPTER_H_
-
-#include <iostream>
-
-#include "CallbackAdapter.h"
-#include "AIOController.h"
-#include "JAIODatatypes.h"
-
-
-class JNICallbackAdapter : public CallbackAdapter
-{
-private:
-
-	AIOController * controller;
-	
-	jobject callback;
-	
-	jobject fileController;
-	
-	jobject bufferReference;
-	
-	jlong sequence;
-	
-	// Is this a read operation
-	short isRead;
-
-	void release(THREAD_CONTEXT threadContext)
-	{
-		JNI_ENV(threadContext)->DeleteGlobalRef(callback);
-		JNI_ENV(threadContext)->DeleteGlobalRef(fileController);
-		JNI_ENV(threadContext)->DeleteGlobalRef(bufferReference);
-		delete this;
-		return;
-	}
-	
-	
-public:
-	// _ob must be a global Reference (use createGloblReferente before calling the constructor)
-	JNICallbackAdapter(AIOController * _controller, jlong sequence, jobject _callback, jobject _fileController, jobject _bufferReference, short _isRead);
-	virtual ~JNICallbackAdapter();
-
-	void done(THREAD_CONTEXT threadContext);
-
-	void onError(THREAD_CONTEXT , long , std::string );
-
-	
-};
-#endif /*JNIBUFFERADAPTER_H_*/

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/c/JNI_AsynchronousFileImpl.cpp
----------------------------------------------------------------------
diff --git a/artemis-native/src/main/c/JNI_AsynchronousFileImpl.cpp b/artemis-native/src/main/c/JNI_AsynchronousFileImpl.cpp
deleted file mode 100644
index 0334a7c..0000000
--- a/artemis-native/src/main/c/JNI_AsynchronousFileImpl.cpp
+++ /dev/null
@@ -1,377 +0,0 @@
-/*
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements. See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License. You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-#include <jni.h>
-#include <stdlib.h>
-#include <iostream>
-#include <stdio.h>
-#include <fcntl.h>
-#include <unistd.h>
-#include <string>
-#include <time.h>
-#include <sys/file.h>
-
-#include "org_apache_activemq_artemis_core_libaio_Native.h"
-
-
-#include "JavaUtilities.h"
-#include "AIOController.h"
-#include "JNICallbackAdapter.h"
-#include "AIOException.h"
-#include "Version.h"
-
-
-// This value is set here globally, to avoid passing stuff on stack between java and the native layer on every sleep call
-struct timespec nanoTime;
-
-inline AIOController * getController(JNIEnv *env, jobject & controllerAddress)
-{
-     return (AIOController *) env->GetDirectBufferAddress(controllerAddress);
-}
-
-/* Inaccessible static: log */
-/* Inaccessible static: totalMaxIO */
-/* Inaccessible static: loaded */
-/* Inaccessible static: EXPECTED_NATIVE_VERSION */
-/*
- * Class:     org.apache.activemq.artemis_core_asyncio_impl_AsynchronousFileImpl
- * Method:    openFile
- * Signature: (Ljava/lang/String;)I
- */
-JNIEXPORT jint JNICALL Java_org_apache_activemq_artemis_core_libaio_Native_openFile
-  (JNIEnv * env , jclass , jstring jstrFileName)
-{
-	std::string fileName = convertJavaString(env, jstrFileName);
-
-    return open(fileName.data(), O_RDWR | O_CREAT, 0666);
-}
-
-/*
- * Class:     org.apache.activemq.artemis_core_asyncio_impl_AsynchronousFileImpl
- * Method:    closeFile
- * Signature: (I)V
- */
-JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_core_libaio_Native_closeFile
-  (JNIEnv * , jclass , jint handle)
-{
-   close(handle);
-}
-
-/*
- * Class:     org.apache.activemq.artemis_core_asyncio_impl_AsynchronousFileImpl
- * Method:    flock
- * Signature: (I)Z
- */
-JNIEXPORT jboolean JNICALL Java_org_apache_activemq_artemis_core_libaio_Native_flock
-  (JNIEnv * , jclass , jint handle)
-{
-    return flock(handle, LOCK_EX | LOCK_NB) == 0;
-}
-
-
-
-/*
- * Class:     org_jboss_jaio_libaioimpl_LibAIOController
- * Method:    init
- * Signature: (Ljava/lang/String;Ljava/lang/Class;)J
- */
-JNIEXPORT jobject JNICALL Java_org_apache_activemq_artemis_core_libaio_Native_init
-  (JNIEnv * env, jclass, jclass controllerClazz, jstring jstrFileName, jint maxIO, jobject logger)
-{
-	AIOController * controller = 0;
-	try
-	{
-		std::string fileName = convertJavaString(env, jstrFileName);
-
-		controller = new AIOController(fileName, (int) maxIO);
-		controller->done = env->GetMethodID(controllerClazz,"callbackDone","(Lorg/apache/activemq/artemis/core/asyncio/AIOCallback;JLjava/nio/ByteBuffer;)V");
-		if (!controller->done)
-		{
-		   throwException (env, -1, "can't get callbackDone method");
-		   return 0;
-		}
-
-		controller->error = env->GetMethodID(controllerClazz, "callbackError", "(Lorg/apache/activemq/artemis/core/asyncio/AIOCallback;JLjava/nio/ByteBuffer;ILjava/lang/String;)V");
-		if (!controller->done)
-		{
-		   throwException (env, -1, "can't get callbackError method");
-		   return 0;
-		}
-
-        jclass loggerClass = env->GetObjectClass(logger);
-
-        if (!(controller->loggerDebug = env->GetMethodID(loggerClass, "debug", "(Ljava/lang/Object;)V"))) return 0;
-        if (!(controller->loggerWarn = env->GetMethodID(loggerClass, "warn", "(Ljava/lang/Object;)V"))) return 0;
-        if (!(controller->loggerInfo = env->GetMethodID(loggerClass, "info", "(Ljava/lang/Object;)V"))) return 0;
-        if (!(controller->loggerError = env->GetMethodID(loggerClass, "error", "(Ljava/lang/Object;)V"))) return 0;
-
-        controller->logger = env->NewGlobalRef(logger);
-
-		return env->NewDirectByteBuffer(controller, 0);
-	}
-	catch (AIOException& e){
-		if (controller != 0)
-		{
-			delete controller;
-		}
-		throwException(env, e.getErrorCode(), e.what());
-		return 0;
-	}
-}
-
-/**
-* objThis here is passed as a parameter at the java layer. It used to be a JNI this and now it's a java static method
-  where the intended reference is now passed as an argument
-*/
-JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_core_libaio_Native_read
-  (JNIEnv *env, jclass, jobject objThis, jobject controllerAddress, jlong position, jlong size, jobject jbuffer, jobject callback)
-{
-	try
-	{
-		AIOController * controller = getController(env, controllerAddress);
-		void * buffer = env->GetDirectBufferAddress(jbuffer);
-
-		if (buffer == 0)
-		{
-			throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Buffer used, libaio requires NativeBuffer instead of Java ByteBuffer");
-			return;
-		}
-
-		if (((long)buffer) % 512)
-		{
-			throwException(env, NATIVE_ERROR_NOT_ALIGNED, "Buffer not aligned for use with DMA");
-			return;
-		}
-
-		CallbackAdapter * adapter = new JNICallbackAdapter(controller, -1, env->NewGlobalRef(callback), env->NewGlobalRef(objThis), env->NewGlobalRef(jbuffer), true);
-
-		controller->fileOutput.read(env, position, (size_t)size, buffer, adapter);
-	}
-	catch (AIOException& e)
-	{
-		throwException(env, e.getErrorCode(), e.what());
-	}
-}
-
-
-// Fast memset on buffer
-JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_core_libaio_Native_resetBuffer
-  (JNIEnv *env, jclass, jobject jbuffer, jint size)
-{
-	void * buffer = env->GetDirectBufferAddress(jbuffer);
-
-	if (buffer == 0)
-	{
-		throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Buffer used, libaio requires NativeBuffer instead of Java ByteBuffer");
-		return;
-	}
-
-	memset(buffer, 0, (size_t)size);
-
-}
-
-JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_core_libaio_Native_destroyBuffer
-  (JNIEnv * env, jclass, jobject jbuffer)
-{
-    if (jbuffer == 0)
-    {
-		throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Null Buffer");
-		return;
-    }
-	void *  buffer = env->GetDirectBufferAddress(jbuffer);
-	free(buffer);
-}
-
-JNIEXPORT jobject JNICALL Java_org_apache_activemq_artemis_core_libaio_Native_newNativeBuffer
-  (JNIEnv * env, jclass, jlong size)
-{
-	try
-	{
-
-		if (size % ALIGNMENT)
-		{
-			throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Buffer size needs to be aligned to 512");
-			return 0;
-		}
-
-
-		// This will allocate a buffer, aligned by 512.
-		// Buffers created here need to be manually destroyed by destroyBuffer, or this would leak on the process heap away of Java's GC managed memory
-		void * buffer = 0;
-		if (::posix_memalign(&buffer, 512, size))
-		{
-			throwException(env, NATIVE_ERROR_INTERNAL, "Error on posix_memalign");
-			return 0;
-		}
-
-		memset(buffer, 0, (size_t)size);
-
-		jobject jbuffer = env->NewDirectByteBuffer(buffer, size);
-		return jbuffer;
-	}
-	catch (AIOException& e)
-	{
-		throwException(env, e.getErrorCode(), e.what());
-		return 0;
-	}
-}
-
-/**
-* objThis here is passed as a parameter at the java layer. It used to be a JNI this and now it's a java static method
-  where the intended reference is now passed as an argument
-*/
-JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_core_libaio_Native_write
-  (JNIEnv *env, jclass, jobject objThis, jobject controllerAddress, jlong sequence, jlong position, jlong size, jobject jbuffer, jobject callback)
-{
-	try
-	{
-		AIOController * controller = getController(env, controllerAddress);
-		void * buffer = env->GetDirectBufferAddress(jbuffer);
-
-		if (buffer == 0)
-		{
-			throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Buffer used, libaio requires NativeBuffer instead of Java ByteBuffer");
-			return;
-		}
-
-
-		CallbackAdapter * adapter = new JNICallbackAdapter(controller, sequence, env->NewGlobalRef(callback), env->NewGlobalRef(objThis), env->NewGlobalRef(jbuffer), false);
-
-		controller->fileOutput.write(env, position, (size_t)size, buffer, adapter);
-	}
-	catch (AIOException& e)
-	{
-		throwException(env, e.getErrorCode(), e.what());
-	}
-}
-
-JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_core_libaio_Native_writeInternal
-  (JNIEnv * env, jclass, jobject controllerAddress, jlong positionToWrite, jlong size, jobject jbuffer)
-{
-	try
-	{
-		AIOController * controller = getController(env, controllerAddress);
-		void * buffer = env->GetDirectBufferAddress(jbuffer);
-
-		if (buffer == 0)
-		{
-			throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Buffer used, libaio requires NativeBuffer instead of Java ByteBuffer");
-			return;
-		}
-
-		controller->fileOutput.writeInternal(env, positionToWrite, (size_t)size, buffer);
-	}
-	catch (AIOException& e)
-	{
-		throwException(env, e.getErrorCode(), e.what());
-	}
-}
-
-
-JNIEXPORT void Java_org_apache_activemq_artemis_core_libaio_Native_internalPollEvents
-  (JNIEnv *env, jclass, jobject controllerAddress)
-{
-	try
-	{
-		AIOController * controller = getController(env, controllerAddress);
-		controller->fileOutput.pollEvents(env);
-	}
-	catch (AIOException& e)
-	{
-		throwException(env, e.getErrorCode(), e.what());
-	}
-}
-
-JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_core_libaio_Native_stopPoller
-  (JNIEnv *env, jclass, jobject controllerAddress)
-{
-	try
-	{
-		AIOController * controller = getController(env, controllerAddress);
-		controller->fileOutput.stopPoller(env);
-	}
-	catch (AIOException& e)
-	{
-		throwException(env, e.getErrorCode(), e.what());
-	}
-}
-
-JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_core_libaio_Native_closeInternal
-  (JNIEnv *env, jclass, jobject controllerAddress)
-{
-	try
-	{
-		AIOController * controller = getController(env, controllerAddress);
-		controller->destroy(env);
-		delete controller;
-	}
-	catch (AIOException& e)
-	{
-		throwException(env, e.getErrorCode(), e.what());
-	}
-}
-
-
-JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_core_libaio_Native_fill
-  (JNIEnv * env, jclass, jobject controllerAddress, jlong position, jint blocks, jlong size, jbyte fillChar)
-{
-	try
-	{
-		AIOController * controller = getController(env, controllerAddress);
-
-		controller->fileOutput.preAllocate(env, position, blocks, size, fillChar);
-
-	}
-	catch (AIOException& e)
-	{
-		throwException(env, e.getErrorCode(), e.what());
-	}
-}
-
-
-
-/** It does nothing... just return true to make sure it has all the binary dependencies */
-JNIEXPORT jint JNICALL Java_org_apache_activemq_artemis_core_libaio_Native_getNativeVersion
-  (JNIEnv *, jclass)
-
-{
-     return _VERSION_NATIVE_AIO;
-}
-
-
-JNIEXPORT jlong JNICALL Java_org_apache_activemq_artemis_core_libaio_Native_size0
-  (JNIEnv * env, jclass, jobject controllerAddress)
-{
-	try
-	{
-		AIOController * controller = getController(env, controllerAddress);
-
-		long size = controller->fileOutput.getSize();
-		if (size < 0)
-		{
-			throwException(env, NATIVE_ERROR_INTERNAL, "InternalError on Native Layer: method size failed");
-			return -1l;
-		}
-		return size;
-	}
-	catch (AIOException& e)
-	{
-		throwException(env, e.getErrorCode(), e.what());
-		return -1l;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/c/JavaUtilities.cpp
----------------------------------------------------------------------
diff --git a/artemis-native/src/main/c/JavaUtilities.cpp b/artemis-native/src/main/c/JavaUtilities.cpp
deleted file mode 100644
index 10d6099..0000000
--- a/artemis-native/src/main/c/JavaUtilities.cpp
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements. See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License. You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-#include <stdio.h>
-#include <iostream>
-#include <string>
-#include "JavaUtilities.h"
-
-
-void throwRuntimeException(JNIEnv * env, const char * message)
-{
-  jclass exceptionClass = env->FindClass("java/lang/RuntimeException");
-  env->ThrowNew(exceptionClass,message);
-  
-}
-
-void throwException(JNIEnv * env, const int code, const char * message)
-{
-  jclass exceptionClass = env->FindClass("org/apache/activemq/artemis/api/core/ActiveMQException");
-  if (exceptionClass==NULL) 
-  {
-     std::cerr << "Couldn't throw exception message:= " << message << "\n";
-     throwRuntimeException (env, "Can't find Exception class");
-     return;
-  }
-
-  jmethodID constructor = env->GetMethodID(exceptionClass, "<init>", "(ILjava/lang/String;)V");
-  if (constructor == NULL)
-  {
-       std::cerr << "Couldn't find the constructor ***";
-       throwRuntimeException (env, "Can't find Constructor for Exception");
-       return;
-  }
-
-  jstring strError = env->NewStringUTF(message);
-  jthrowable ex = (jthrowable)env->NewObject(exceptionClass, constructor, code, strError);
-  env->Throw(ex);
-  
-}
-
-std::string convertJavaString(JNIEnv * env, jstring& jstr)
-{
-	const char * valueStr = env->GetStringUTFChars(jstr, NULL);
-	std::string data(valueStr);
-	env->ReleaseStringUTFChars(jstr, valueStr);
-	return data;
-}
-

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/c/JavaUtilities.h
----------------------------------------------------------------------
diff --git a/artemis-native/src/main/c/JavaUtilities.h b/artemis-native/src/main/c/JavaUtilities.h
deleted file mode 100644
index 53ba870..0000000
--- a/artemis-native/src/main/c/JavaUtilities.h
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements. See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License. You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-#ifndef JAVAUTILITIES_H_
-#define JAVAUTILITIES_H_
-#include <string>
-#include <jni.h>
-
-void throwException(JNIEnv * env, const int code, const char * message);
-std::string convertJavaString(JNIEnv * env, jstring& jstr);
-
-#endif /*JAVAUTILITIES_H_*/

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/c/LockClass.h
----------------------------------------------------------------------
diff --git a/artemis-native/src/main/c/LockClass.h b/artemis-native/src/main/c/LockClass.h
deleted file mode 100644
index 5259919..0000000
--- a/artemis-native/src/main/c/LockClass.h
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements. See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License. You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-#ifndef LOCKCLASS_H_
-#define LOCKCLASS_H_
-
-#include <pthread.h>
-
-class LockClass
-{
-protected:
-    pthread_mutex_t* _m;
-public:
-    inline LockClass(pthread_mutex_t* m) : _m(m)
-    {
-        ::pthread_mutex_lock(_m);
-    }
-    inline ~LockClass()
-    {
-        ::pthread_mutex_unlock(_m);
-    }
-};
-
-
-#endif /*LOCKCLASS_H_*/

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/c/Version.h
----------------------------------------------------------------------
diff --git a/artemis-native/src/main/c/Version.h b/artemis-native/src/main/c/Version.h
deleted file mode 100644
index 5b521b3..0000000
--- a/artemis-native/src/main/c/Version.h
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements. See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License. You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-#ifndef _VERSION_NATIVE_AIO
-
-// This definition needs to match org.apache.activemq.artemis.core.asyncio.impl.AsynchronousFileImpl.EXPECTED_NATIVE_VERSION
-// Or else the native module won't be loaded because of version mismatches
-#define _VERSION_NATIVE_AIO 52
-#endif
-

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/c/exception_helper.h
----------------------------------------------------------------------
diff --git a/artemis-native/src/main/c/exception_helper.h b/artemis-native/src/main/c/exception_helper.h
new file mode 100644
index 0000000..d8c7707
--- /dev/null
+++ b/artemis-native/src/main/c/exception_helper.h
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2015 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+void throwRuntimeException(JNIEnv* env, char* message);
+void throwRuntimeExceptionErrorNo(JNIEnv* env, char* message, int errorNumber);
+void throwIOException(JNIEnv* env, char* message);
+void throwIOExceptionErrorNo(JNIEnv* env, char* message, int errorNumber);
+void throwClosedChannelException(JNIEnv* env);
+void throwOutOfMemoryError(JNIEnv* env);
+char* exceptionMessage(char* msg, int error);


Mime
View raw message