activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [05/51] [abbrv] [partial] activemq-6 git commit: ACTIVEMQ6-4 - Rename packages to ActiveMQ
Date Tue, 11 Nov 2014 18:41:35 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalAddRecordTX.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalAddRecordTX.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalAddRecordTX.java
new file mode 100644
index 0000000..7c6403f
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalAddRecordTX.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal.impl.dataformat;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.core.journal.EncodingSupport;
+import org.apache.activemq6.core.journal.impl.JournalImpl;
+
+/**
+ * A JournalAddRecordTX
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalAddRecordTX extends JournalInternalRecord
+{
+
+   private final long txID;
+
+   private final long id;
+
+   private final EncodingSupport record;
+
+   private final byte recordType;
+
+   private final boolean add;
+
+   /**
+    * @param id
+    * @param recordType
+    * @param record
+    */
+   public JournalAddRecordTX(final boolean add,
+                             final long txID,
+                             final long id,
+                             final byte recordType,
+                             final EncodingSupport record)
+   {
+
+      this.txID = txID;
+
+      this.id = id;
+
+      this.record = record;
+
+      this.recordType = recordType;
+
+      this.add = add;
+   }
+
+   @Override
+   public void encode(final HornetQBuffer buffer)
+   {
+      if (add)
+      {
+         buffer.writeByte(JournalImpl.ADD_RECORD_TX);
+      }
+      else
+      {
+         buffer.writeByte(JournalImpl.UPDATE_RECORD_TX);
+      }
+
+      buffer.writeInt(fileID);
+
+      buffer.writeByte(compactCount);
+
+      buffer.writeLong(txID);
+
+      buffer.writeLong(id);
+
+      buffer.writeInt(record.getEncodeSize());
+
+      buffer.writeByte(recordType);
+
+      record.encode(buffer);
+
+      buffer.writeInt(getEncodeSize());
+   }
+
+   @Override
+   public int getEncodeSize()
+   {
+      return JournalImpl.SIZE_ADD_RECORD_TX + record.getEncodeSize() + 1;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalCompleteRecordTX.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalCompleteRecordTX.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalCompleteRecordTX.java
new file mode 100644
index 0000000..e7ebb3b
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalCompleteRecordTX.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal.impl.dataformat;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.core.journal.EncodingSupport;
+import org.apache.activemq6.core.journal.impl.JournalImpl;
+
+/**
+ * <p>
+ * A transaction record (Commit or Prepare), will hold the number of elements the transaction has in
+ * the current file.
+ * <p>
+ * While loading the {@link org.apache.activemq6.core.journal.impl.JournalFile}, the number of operations found is matched against this
+ * number. If for any reason there are missing operations, the transaction will be ignored.
+ * <p>
+ * We can't just use a global counter as reclaiming could delete files after the transaction was
+ * successfully committed. That also means not having a whole file on journal-reload doesn't mean we
+ * have to invalidate the transaction
+ * <p>
+ * The commit operation itself is not included in this total.
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ */
+public class JournalCompleteRecordTX extends JournalInternalRecord
+{
+   public enum TX_RECORD_TYPE
+   {
+      COMMIT, PREPARE;
+   }
+   private final TX_RECORD_TYPE txRecordType;
+
+   private final long txID;
+
+   private final EncodingSupport transactionData;
+
+   private int numberOfRecords;
+
+   public JournalCompleteRecordTX(final TX_RECORD_TYPE isCommit, final long txID, final EncodingSupport transactionData)
+   {
+      this.txRecordType = isCommit;
+
+      this.txID = txID;
+
+      this.transactionData = transactionData;
+   }
+
+   @Override
+   public void encode(final HornetQBuffer buffer)
+   {
+      if (txRecordType == TX_RECORD_TYPE.COMMIT)
+      {
+         buffer.writeByte(JournalImpl.COMMIT_RECORD);
+      }
+      else
+      {
+         buffer.writeByte(JournalImpl.PREPARE_RECORD);
+      }
+
+      buffer.writeInt(fileID);
+
+      buffer.writeByte(compactCount);
+
+      buffer.writeLong(txID);
+
+      buffer.writeInt(numberOfRecords);
+
+      if (transactionData != null)
+      {
+         buffer.writeInt(transactionData.getEncodeSize());
+      }
+
+      if (transactionData != null)
+      {
+         transactionData.encode(buffer);
+      }
+
+      buffer.writeInt(getEncodeSize());
+   }
+
+   @Override
+   public void setNumberOfRecords(final int records)
+   {
+      numberOfRecords = records;
+   }
+
+   @Override
+   public int getNumberOfRecords()
+   {
+      return numberOfRecords;
+   }
+
+   @Override
+   public int getEncodeSize()
+   {
+      if (txRecordType == TX_RECORD_TYPE.COMMIT)
+      {
+         return JournalImpl.SIZE_COMPLETE_TRANSACTION_RECORD + 1;
+      }
+      else
+      {
+         return JournalImpl.SIZE_PREPARE_RECORD + (transactionData != null ? transactionData.getEncodeSize() : 0) + 1;
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalDeleteRecord.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalDeleteRecord.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalDeleteRecord.java
new file mode 100644
index 0000000..188418e
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalDeleteRecord.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal.impl.dataformat;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.core.journal.impl.JournalImpl;
+
+/**
+ * A JournalDeleteRecord
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalDeleteRecord extends JournalInternalRecord
+{
+
+   private final long id;
+
+   /**
+    * @param id
+    */
+   public JournalDeleteRecord(final long id)
+   {
+      this.id = id;
+   }
+
+   public void encode(final HornetQBuffer buffer)
+   {
+      buffer.writeByte(JournalImpl.DELETE_RECORD);
+
+      buffer.writeInt(fileID);
+
+      buffer.writeByte(compactCount);
+
+      buffer.writeLong(id);
+
+      buffer.writeInt(getEncodeSize());
+   }
+
+   @Override
+   public int getEncodeSize()
+   {
+      return JournalImpl.SIZE_DELETE_RECORD + 1;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalDeleteRecordTX.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalDeleteRecordTX.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalDeleteRecordTX.java
new file mode 100644
index 0000000..bdbd7ef
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalDeleteRecordTX.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal.impl.dataformat;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.core.journal.EncodingSupport;
+import org.apache.activemq6.core.journal.impl.JournalImpl;
+
+/**
+ * A JournalDeleteRecordTX
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalDeleteRecordTX extends JournalInternalRecord
+{
+
+   private final long txID;
+
+   private final long id;
+
+   private final EncodingSupport record;
+
+   /**
+    * @param txID
+    * @param id
+    * @param record
+    */
+   public JournalDeleteRecordTX(final long txID, final long id, final EncodingSupport record)
+   {
+      this.id = id;
+
+      this.txID = txID;
+
+      this.record = record;
+   }
+
+   @Override
+   public void encode(final HornetQBuffer buffer)
+   {
+      buffer.writeByte(JournalImpl.DELETE_RECORD_TX);
+
+      buffer.writeInt(fileID);
+
+      buffer.writeByte(compactCount);
+
+      buffer.writeLong(txID);
+
+      buffer.writeLong(id);
+
+      buffer.writeInt(record != null ? record.getEncodeSize() : 0);
+
+      if (record != null)
+      {
+         record.encode(buffer);
+      }
+
+      buffer.writeInt(getEncodeSize());
+   }
+
+   @Override
+   public int getEncodeSize()
+   {
+      return JournalImpl.SIZE_DELETE_RECORD_TX + (record != null ? record.getEncodeSize() : 0) + 1;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalInternalRecord.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalInternalRecord.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalInternalRecord.java
new file mode 100644
index 0000000..5b5f707
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalInternalRecord.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal.impl.dataformat;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.core.journal.EncodingSupport;
+
+/**
+ * A InternalEncoder
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public abstract class JournalInternalRecord implements EncodingSupport
+{
+
+   protected int fileID;
+
+   protected byte compactCount;
+
+   public int getFileID()
+   {
+      return fileID;
+   }
+
+   public void setFileID(final int fileID)
+   {
+      this.fileID = fileID;
+   }
+
+   public void decode(final HornetQBuffer buffer)
+   {
+   }
+
+   public void setNumberOfRecords(final int records)
+   {
+   }
+
+   public int getNumberOfRecords()
+   {
+      return 0;
+   }
+
+   public short getCompactCount()
+   {
+      return compactCount;
+   }
+
+   public void setCompactCount(final short compactCount)
+   {
+      if (compactCount > Byte.MAX_VALUE)
+      {
+         this.compactCount = Byte.MAX_VALUE;
+      }
+      else
+      {
+         this.compactCount = (byte)compactCount;
+      }
+   }
+
+   public abstract int getEncodeSize();
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalRollbackRecordTX.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalRollbackRecordTX.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalRollbackRecordTX.java
new file mode 100644
index 0000000..5f07256
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalRollbackRecordTX.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal.impl.dataformat;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.core.journal.impl.JournalImpl;
+
+/**
+ * A JournalRollbackRecordTX
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalRollbackRecordTX extends JournalInternalRecord
+{
+   private final long txID;
+
+   public JournalRollbackRecordTX(final long txID)
+   {
+      this.txID = txID;
+   }
+
+   @Override
+   public void encode(final HornetQBuffer buffer)
+   {
+      buffer.writeByte(JournalImpl.ROLLBACK_RECORD);
+      buffer.writeInt(fileID);
+      buffer.writeByte(compactCount);
+      buffer.writeLong(txID);
+      buffer.writeInt(JournalImpl.SIZE_ROLLBACK_RECORD + 1);
+
+   }
+
+   @Override
+   public int getEncodeSize()
+   {
+      return JournalImpl.SIZE_ROLLBACK_RECORD + 1;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/journal/HornetQJournalBundle.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/journal/HornetQJournalBundle.java b/activemq6-journal/src/main/java/org/apache/activemq6/journal/HornetQJournalBundle.java
new file mode 100644
index 0000000..6601263
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/journal/HornetQJournalBundle.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.journal;
+
+
+import org.apache.activemq6.api.core.HornetQIOErrorException;
+import org.jboss.logging.annotations.Message;
+import org.jboss.logging.annotations.MessageBundle;
+import org.jboss.logging.Messages;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ *         3/12/12
+ *
+ * Logger Code 14
+ *
+ * each message id must be 6 digits long starting with 14, the 3rd digit should be 9
+ *
+ * so 149000 to 149999
+ */
+@MessageBundle(projectCode = "HQ")
+public interface HornetQJournalBundle
+{
+   HornetQJournalBundle BUNDLE = Messages.getBundle(HornetQJournalBundle.class);
+
+   @Message(id = 149000, value =  "failed to rename file {0} to {1}", format = Message.Format.MESSAGE_FORMAT)
+   HornetQIOErrorException ioRenameFileError(String name, String newFileName);
+
+   @Message(id = 149001, value =  "Journal data belong to a different version", format = Message.Format.MESSAGE_FORMAT)
+   HornetQIOErrorException journalDifferentVersion();
+
+   @Message(id = 149002, value =  "Journal files version mismatch. You should export the data from the previous version and import it as explained on the user''s manual",
+         format = Message.Format.MESSAGE_FORMAT)
+   HornetQIOErrorException journalFileMisMatch();
+
+   @Message(id = 149003, value =   "File not opened", format = Message.Format.MESSAGE_FORMAT)
+   HornetQIOErrorException fileNotOpened();
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/journal/HornetQJournalLogger.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/journal/HornetQJournalLogger.java b/activemq6-journal/src/main/java/org/apache/activemq6/journal/HornetQJournalLogger.java
new file mode 100644
index 0000000..db7a9f3
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/journal/HornetQJournalLogger.java
@@ -0,0 +1,271 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.journal;
+
+import org.apache.activemq6.core.journal.impl.JournalFile;
+import org.jboss.logging.BasicLogger;
+import org.jboss.logging.Logger;
+import org.jboss.logging.annotations.Cause;
+import org.jboss.logging.annotations.LogMessage;
+import org.jboss.logging.annotations.Message;
+import org.jboss.logging.annotations.MessageLogger;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ *         3/15/12
+ *
+ * Logger Code 14
+ *
+ * each message id must be 6 digits long starting with 14, the 3rd digit donates the level so
+ *
+ * INF0  1
+ * WARN  2
+ * DEBUG 3
+ * ERROR 4
+ * TRACE 5
+ * FATAL 6
+ *
+ * so an INFO message would be 141000 to 141999
+ */
+@MessageLogger(projectCode = "HQ")
+public interface HornetQJournalLogger extends BasicLogger
+{
+   /**
+    * The journal logger.
+    */
+   HornetQJournalLogger LOGGER = Logger.getMessageLogger(HornetQJournalLogger.class, HornetQJournalLogger.class.getPackage().getName());
+
+   @LogMessage(level = Logger.Level.INFO)
+   @Message(id = 141000, value = "*** running direct journal blast: {0}", format = Message.Format.MESSAGE_FORMAT)
+   void runningJournalBlast(Integer numIts);
+
+   @LogMessage(level = Logger.Level.INFO)
+   @Message(id = 141002, value = "starting thread for sync speed test", format = Message.Format.MESSAGE_FORMAT)
+   void startingThread();
+
+   @LogMessage(level = Logger.Level.INFO)
+   @Message(id = 141003, value = "Write rate = {0} bytes / sec or {1} MiB / sec", format = Message.Format.MESSAGE_FORMAT)
+   void writeRate(Double rate, Long l);
+
+   @LogMessage(level = Logger.Level.INFO)
+   @Message(id = 141004, value = "Flush rate = {0} flushes / sec", format = Message.Format.MESSAGE_FORMAT)
+   void flushRate(Double rate);
+
+   @LogMessage(level = Logger.Level.INFO)
+   @Message(id = 141005, value = "Check Data Files:", format = Message.Format.MESSAGE_FORMAT)
+   void checkFiles();
+
+   @LogMessage(level = Logger.Level.INFO)
+   @Message(id = 141006, value = "Sequence out of order on journal", format = Message.Format.MESSAGE_FORMAT)
+   void seqOutOfOrder();
+
+   @LogMessage(level = Logger.Level.INFO)
+   @Message(id = 141007, value = "Current File on the journal is <= the sequence file.getFileID={0} on the dataFiles" +
+                                 "\nCurrentfile.getFileId={1} while the file.getFileID()={2}" +
+                                 "\nIs same = ({3})",
+            format = Message.Format.MESSAGE_FORMAT)
+   void currentFile(Long fileID, Long id, Long fileFileID, Boolean b);
+
+   @LogMessage(level = Logger.Level.INFO)
+   @Message(id = 141008, value = "Free File ID out of order", format = Message.Format.MESSAGE_FORMAT)
+   void fileIdOutOfOrder();
+
+   @LogMessage(level = Logger.Level.INFO)
+   @Message(id = 141009, value = "A Free File is less than the maximum data", format = Message.Format.MESSAGE_FORMAT)
+   void fileTooSmall();
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 142000, value = "You have a native library with a different version than expected", format = Message.Format.MESSAGE_FORMAT)
+   void incompatibleNativeLibrary();
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 142001, value = "Could not get lock after 60 seconds on closing Asynchronous File: {0}",
+            format = Message.Format.MESSAGE_FORMAT)
+   void couldNotGetLock(String fileName);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 142002, value = "Asynchronous File: {0} being finalized with opened state", format = Message.Format.MESSAGE_FORMAT)
+   void fileFinalizedWhileOpen(String fileName);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 142003, value = "AIO Callback Error: {0}", format = Message.Format.MESSAGE_FORMAT)
+   void callbackError(String error);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 142004, value = "Inconsistency during compacting: CommitRecord ID = {0} for an already committed transaction during compacting",
+         format = Message.Format.MESSAGE_FORMAT)
+   void inconsistencyDuringCompacting(Long transactionID);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 142005, value = "Inconsistency during compacting: Delete record being read on an existent record (id={0})",
+         format = Message.Format.MESSAGE_FORMAT)
+   void inconsistencyDuringCompactingDelete(Long recordID);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 142006, value = "Could not find add Record information for record {0} during compacting",
+         format = Message.Format.MESSAGE_FORMAT)
+   void compactingWithNoAddRecord(Long id);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 142007, value = "Can not find record {0} during compact replay",
+         format = Message.Format.MESSAGE_FORMAT)
+   void noRecordDuringCompactReplay(Long id);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 142008, value = "Could not remove file {0} from the list of data files",
+         format = Message.Format.MESSAGE_FORMAT)
+   void couldNotRemoveFile(JournalFile file);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 142009, value = "Deleting {0} as it does not have the configured size",
+         format = Message.Format.MESSAGE_FORMAT)
+   void deletingFile(JournalFile file);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 142010, value = "Failed to add file to opened files queue: {0}. This should NOT happen!",
+         format = Message.Format.MESSAGE_FORMAT)
+   void failedToAddFile(JournalFile nextOpenedFile);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 142011, value = "Error on reading compacting for {0}",
+         format = Message.Format.MESSAGE_FORMAT)
+   void compactReadError(JournalFile file);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 142012, value = "Couldn''t find tx={0} to merge after compacting",
+         format = Message.Format.MESSAGE_FORMAT)
+   void compactMergeError(Long id);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 142013, value = "Prepared transaction {0} was not considered completed, it will be ignored",
+         format = Message.Format.MESSAGE_FORMAT)
+   void preparedTXIncomplete(Long id);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 142014, value = "Transaction {0} is missing elements so the transaction is being ignored",
+         format = Message.Format.MESSAGE_FORMAT)
+   void txMissingElements(Long id);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 142015, value = "Uncommitted transaction with id {0} found and discarded",
+         format = Message.Format.MESSAGE_FORMAT)
+   void uncomittedTxFound(Long id);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 142016, value = "Couldn''t stop compactor executor after 120 seconds",
+         format = Message.Format.MESSAGE_FORMAT)
+   void couldNotStopCompactor();
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 142017, value = "Couldn''t stop journal executor after 60 seconds",
+         format = Message.Format.MESSAGE_FORMAT)
+   void couldNotStopJournalExecutor();
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 142018, value = "Temporary files were left unnatended after a crash on journal directory, deleting invalid files now",
+         format = Message.Format.MESSAGE_FORMAT)
+   void tempFilesLeftOpen();
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 142019, value =  "Deleting orphaned file {0}", format = Message.Format.MESSAGE_FORMAT)
+   void deletingOrphanedFile(String fileToDelete);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 142020, value =  "Couldn''t get lock after 60 seconds on closing Asynchronous File: {0}", format = Message.Format.MESSAGE_FORMAT)
+   void errorClosingFile(String fileToDelete);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 142021, value =  "Error on IO callback, {0}", format = Message.Format.MESSAGE_FORMAT)
+   void errorOnIOCallback(String errorMessage);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 142022, value =  "Timed out on AIO poller shutdown", format = Message.Format.MESSAGE_FORMAT)
+   void timeoutOnPollerShutdown(@Cause Exception e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 142023, value =  "Executor on file {0} couldn''t complete its tasks in 60 seconds.", format = Message.Format.MESSAGE_FORMAT)
+   void couldNotCompleteTask(@Cause Exception e, String name);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 142024, value =  "Error completing callback", format = Message.Format.MESSAGE_FORMAT)
+   void errorCompletingCallback(@Cause Throwable e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 142025, value =  "Error calling onError callback", format = Message.Format.MESSAGE_FORMAT)
+   void errorCallingErrorCallback(@Cause Throwable e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 142026, value =  "Timed out on AIO writer shutdown", format = Message.Format.MESSAGE_FORMAT)
+   void timeoutOnWriterShutdown(@Cause Throwable e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 142027, value =  "Error on writing data! {0} code - {1}", format = Message.Format.MESSAGE_FORMAT)
+   void errorWritingData(@Cause Throwable e, String errorMessage, Integer errorCode);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 142028, value =  "Error replaying pending commands after compacting", format = Message.Format.MESSAGE_FORMAT)
+   void errorReplayingCommands(@Cause Throwable e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 142029, value =  "Error closing file", format = Message.Format.MESSAGE_FORMAT)
+   void errorClosingFile(@Cause Throwable e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 142030, value = "Could not open a file in 60 Seconds", format = Message.Format.MESSAGE_FORMAT)
+   void errorOpeningFile(@Cause Throwable e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 142031, value =  "Error retrieving ID part of the file name {0}", format = Message.Format.MESSAGE_FORMAT)
+   void errorRetrievingID(@Cause Throwable e, String fileName);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 142032, value =  "Error reading journal file", format = Message.Format.MESSAGE_FORMAT)
+   void errorReadingFile(@Cause Throwable e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 142033, value =  "Error reinitializing file {0}", format = Message.Format.MESSAGE_FORMAT)
+   void errorReinitializingFile(@Cause Throwable e, JournalFile file);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 142034, value = "Exception on submitting write", format = Message.Format.MESSAGE_FORMAT)
+   void errorSubmittingWrite(@Cause Throwable e);
+
+   @LogMessage(level = Logger.Level.ERROR)
+   @Message(id = 144000, value = "Failed to delete file {0}", format = Message.Format.MESSAGE_FORMAT)
+   void errorDeletingFile(Object e);
+
+   @LogMessage(level = Logger.Level.ERROR)
+   @Message(id = 144001, value = "Error starting poller", format = Message.Format.MESSAGE_FORMAT)
+   void errorStartingPoller(@Cause Exception e);
+
+   @LogMessage(level = Logger.Level.ERROR)
+   @Message(id = 144002, value = "Error pushing opened file", format = Message.Format.MESSAGE_FORMAT)
+   void errorPushingFile(@Cause Exception e);
+
+   @LogMessage(level = Logger.Level.ERROR)
+   @Message(id = 144003, value = "Error compacting", format = Message.Format.MESSAGE_FORMAT)
+   void errorCompacting(@Cause Throwable e);
+
+   @LogMessage(level = Logger.Level.ERROR)
+   @Message(id = 144004, value = "Error scheduling compacting", format = Message.Format.MESSAGE_FORMAT)
+   void errorSchedulingCompacting(@Cause Throwable e);
+
+   @LogMessage(level = Logger.Level.ERROR)
+   @Message(id = 144005, value = "Failed to performance blast", format = Message.Format.MESSAGE_FORMAT)
+   void failedToPerfBlast(@Cause Throwable e);
+
+   @LogMessage(level = Logger.Level.ERROR)
+   @Message(id = 144006, value = "IOError code {0}, {1}", format = Message.Format.MESSAGE_FORMAT)
+   void ioError(final int errorCode, final String errorMessage);
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/test/java/org/apache/activemq6/tests/asyncio/LibaioDependencyCheckTest.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/test/java/org/apache/activemq6/tests/asyncio/LibaioDependencyCheckTest.java b/activemq6-journal/src/test/java/org/apache/activemq6/tests/asyncio/LibaioDependencyCheckTest.java
new file mode 100644
index 0000000..742635a
--- /dev/null
+++ b/activemq6-journal/src/test/java/org/apache/activemq6/tests/asyncio/LibaioDependencyCheckTest.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.tests.asyncio;
+
+import org.junit.Test;
+
+import org.junit.Assert;
+import org.apache.activemq6.core.asyncio.impl.AsynchronousFileImpl;
+
+/**
+ * A LibaioDependencyCheckTest
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class LibaioDependencyCheckTest extends Assert
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   @Test
+   public void testDependency() throws Exception
+   {
+      if (System.getProperties().get("os.name").equals("Linux"))
+      {
+         assertTrue("Libaio is not available on this platform", AsynchronousFileImpl.isLoaded());
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/README
----------------------------------------------------------------------
diff --git a/activemq6-native/README b/activemq6-native/README
new file mode 100644
index 0000000..63404ef
--- /dev/null
+++ b/activemq6-native/README
@@ -0,0 +1,45 @@
+
+This is a simple tutorial on compiling libHornetQLibAIO.so
+
+DEPENDENCIES
+
+Make sure you install these packages:
+
+- G++ (yum install gcc-c++ or aptitude install g++)
+- Gcc (yum insall gcc or aptitude install gcc)
+- JDK (full JDK)
+
+
+LIBAIO INFORMATION
+
+libaio is part of the kernel project. The library makes system calls on the kernel layer.
+
+This is the project information:
+
+Git Repository:  git://git.kernel.org/pub/scm/libs/libaio/libaio.git
+Mailing List:    linux-aio@kvack.org
+
+
+STEPS TO BUILD
+
+1. Make sure you have JAVA_HOME defined, and pointing to the root of your JDK:
+
+Example:
+ 
+ $> export JAVA_HOME=/usr/share/jdk1.7
+ 
+ 
+2. Call compile-native.sh. Bootstrap will call all the initial scripts you need
+ $>  ./compile-native.sh
+ 
+if you are missing any dependencies, autoconf would tell you what you're missing.
+ 
+
+COMPILED FILE
+
+The produced file will be under ./src/.libs/libHornetQLibAIO.so
+
+
+DOCUMENTATION
+
+The User Manual, chapter 38 (Libaio Native Libraries) will provide more details about our native libraries on libaio.

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/bin/libHornetQAIO32.so
----------------------------------------------------------------------
diff --git a/activemq6-native/bin/libHornetQAIO32.so b/activemq6-native/bin/libHornetQAIO32.so
new file mode 100755
index 0000000..cd431fc
Binary files /dev/null and b/activemq6-native/bin/libHornetQAIO32.so differ

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/bin/libHornetQAIO64.so
----------------------------------------------------------------------
diff --git a/activemq6-native/bin/libHornetQAIO64.so b/activemq6-native/bin/libHornetQAIO64.so
new file mode 100755
index 0000000..e7f68bc
Binary files /dev/null and b/activemq6-native/bin/libHornetQAIO64.so differ

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/compile-native.sh
----------------------------------------------------------------------
diff --git a/activemq6-native/compile-native.sh b/activemq6-native/compile-native.sh
new file mode 100755
index 0000000..d37a352
--- /dev/null
+++ b/activemq6-native/compile-native.sh
@@ -0,0 +1 @@
+mvn install -Pnative-build

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/pom.xml
----------------------------------------------------------------------
diff --git a/activemq6-native/pom.xml b/activemq6-native/pom.xml
new file mode 100644
index 0000000..d6929e4
--- /dev/null
+++ b/activemq6-native/pom.xml
@@ -0,0 +1,126 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+   <name>ActiveMQ6 Native POM</name>
+   <modelVersion>4.0.0</modelVersion>
+
+   <parent>
+      <groupId>org.apache.activemq6</groupId>
+      <artifactId>activemq6-pom</artifactId>
+      <version>6.0.0-SNAPSHOT</version>
+   </parent>
+
+   <artifactId>activemq6-native</artifactId>
+   <packaging>${native-package-type}</packaging>
+   <dependencies>
+      <dependency>
+         <groupId>org.apache.activemq6</groupId>
+         <artifactId>activemq6-commons</artifactId>
+         <version>${project.version}</version>
+      </dependency>
+   </dependencies>
+
+   <build>
+      <resources>
+         <resource>
+            <directory>${basedir}/target/output/</directory>
+         </resource>
+      </resources>
+      <plugins>
+         <plugin>
+            <artifactId>maven-resources-plugin</artifactId>
+            <executions>
+               <execution>
+                  <id>copy-resources-32</id>
+                  <phase>validate</phase>
+                  <goals>
+                     <goal>copy-resources</goal>
+                  </goals>
+                  <configuration>
+                     <outputDirectory>${basedir}/target/output/lib/linux-i686/</outputDirectory>
+                     <resources>
+                        <resource>
+                           <directory>bin/</directory>
+                           <includes>
+                              <include>libHornetQAIO32.so</include>
+                           </includes>
+                        </resource>
+                     </resources>
+                  </configuration>
+               </execution>
+               <execution>
+                  <id>copy-resources-64</id>
+                  <phase>validate</phase>
+                  <goals>
+                     <goal>copy-resources</goal>
+                  </goals>
+                  <configuration>
+                     <outputDirectory>${basedir}/target/output/lib/linux-x86_64/</outputDirectory>
+                     <resources>
+                        <resource>
+                           <directory>bin/</directory>
+                           <includes>
+                              <include>libHornetQAIO64.so</include>
+                           </includes>
+                        </resource>
+                     </resources>
+                  </configuration>
+               </execution>
+            </executions>
+         </plugin>
+      </plugins>
+   </build>
+
+
+   <profiles>
+      <profile>
+         <id>native-build</id>
+         <properties>
+            <native-package-type>nar</native-package-type>
+         </properties>
+         <build>
+            <plugins>
+               <plugin>
+                  <!-- Not officially released into Central Maven yet,
+                       for now you have to download the snapshot with
+                       git clone https://github.com/maven-nar/nar-maven-plugin.git
+                       cd nar-maven-plugin
+                       mvn install
+                       -->
+                  <groupId>com.github.maven-nar</groupId>
+                  <artifactId>nar-maven-plugin</artifactId>
+                  <version>3.0.0</version>
+                  <extensions>true</extensions>
+                  <configuration>
+                     <java>
+                        <include>true</include>
+                     </java>
+                     <c>
+                        <systemIncludePaths>
+                        </systemIncludePaths>
+                     </c>
+                     <linker>
+                        <sysLibs>
+                           <sysLib>
+                              <name>aio</name>
+                           </sysLib>
+                        </sysLibs>
+                     </linker>
+                     <libraries>
+                        <library>
+                           <type>jni</type>
+                           <narSystemPackage>org.hornetq.core.libaio</narSystemPackage>
+                        </library>
+                     </libraries>
+                  </configuration>
+               </plugin>
+            </plugins>
+         </build>
+      </profile>
+   </profiles>
+
+   <properties>
+      <native-package-type>jar</native-package-type>
+      <hornetq.basedir>${project.basedir}/..</hornetq.basedir>
+   </properties>
+
+</project>

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/src/main/c/AIOController.cpp
----------------------------------------------------------------------
diff --git a/activemq6-native/src/main/c/AIOController.cpp b/activemq6-native/src/main/c/AIOController.cpp
new file mode 100644
index 0000000..e43f15a
--- /dev/null
+++ b/activemq6-native/src/main/c/AIOController.cpp
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+
+#include <string>
+#include "AIOController.h"
+#include "JavaUtilities.h"
+#include "JAIODatatypes.h"
+
+AIOController::AIOController(std::string fileName, int maxIO) : logger(0), fileOutput(fileName, this, maxIO) 
+{
+}
+
+void AIOController::log(THREAD_CONTEXT threadContext, short level, const char * message)
+{
+	jmethodID methodID = 0;
+	
+	switch (level)
+	{
+	case 0: methodID = loggerError; break;
+	case 1: methodID = loggerWarn; break;
+	case 2: methodID = loggerInfo; break;
+	case 3: methodID = loggerDebug; break;
+	default: methodID = loggerDebug; break;
+	}
+
+#ifdef DEBUG
+	fprintf (stderr,"Callig log methodID=%ld, message=%s, logger=%ld, threadContext = %ld\n", (long) methodID, message, (long) logger, (long) threadContext); fflush(stderr);
+#endif
+	threadContext->CallVoidMethod(logger,methodID,threadContext->NewStringUTF(message));
+}
+
+
+void AIOController::destroy(THREAD_CONTEXT context)
+{
+	if (logger != 0)
+	{
+		context->DeleteGlobalRef(logger);
+	}
+}
+
+/*
+ * level = 0-error, 1-warn, 2-info, 3-debug
+ */
+
+
+AIOController::~AIOController()
+{
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/src/main/c/AIOController.h
----------------------------------------------------------------------
diff --git a/activemq6-native/src/main/c/AIOController.h b/activemq6-native/src/main/c/AIOController.h
new file mode 100644
index 0000000..135a3f6
--- /dev/null
+++ b/activemq6-native/src/main/c/AIOController.h
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+
+#ifndef AIOCONTROLLER_H_
+#define AIOCONTROLLER_H_
+#include <jni.h>
+#include <string>
+#include "JAIODatatypes.h"
+#include "AsyncFile.h"
+
+class AIOController
+{
+public:
+	jmethodID done;
+	jmethodID error;
+
+	jobject logger;
+	
+	jmethodID loggerError;
+	jmethodID loggerWarn;
+	jmethodID loggerDebug;
+	jmethodID loggerInfo;
+
+	/*
+	 * level = 0-error, 1-warn, 2-info, 3-debug
+	 */
+	void log(THREAD_CONTEXT threadContext, short level, const char * message);
+	
+	AsyncFile fileOutput;
+	
+	void destroy(THREAD_CONTEXT context);
+	
+	AIOController(std::string fileName, int maxIO);
+	virtual ~AIOController();
+};
+#endif /*AIOCONTROLLER_H_*/

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/src/main/c/AIOException.h
----------------------------------------------------------------------
diff --git a/activemq6-native/src/main/c/AIOException.h b/activemq6-native/src/main/c/AIOException.h
new file mode 100644
index 0000000..70f7c71
--- /dev/null
+++ b/activemq6-native/src/main/c/AIOException.h
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+
+
+#ifndef AIOEXCEPTION_H_
+#define AIOEXCEPTION_H_
+
+#include <exception>
+#include <string>
+
+
+#define NATIVE_ERROR_INTERNAL 200
+#define NATIVE_ERROR_INVALID_BUFFER 201
+#define NATIVE_ERROR_NOT_ALIGNED 202
+#define NATIVE_ERROR_CANT_INITIALIZE_AIO 203
+#define NATIVE_ERROR_CANT_RELEASE_AIO 204
+#define NATIVE_ERROR_CANT_OPEN_CLOSE_FILE 205
+#define NATIVE_ERROR_CANT_ALLOCATE_QUEUE 206
+#define NATIVE_ERROR_PREALLOCATE_FILE 208
+#define NATIVE_ERROR_ALLOCATE_MEMORY 209
+#define NATIVE_ERROR_IO 006
+#define NATIVE_ERROR_AIO_FULL 211
+
+
+class AIOException : public std::exception
+{
+private:
+	int errorCode;
+	std::string message;
+public:
+	AIOException(int _errorCode, std::string  _message) throw() : errorCode(_errorCode), message(_message)
+	{
+		errorCode = _errorCode;
+		message = _message;
+	}
+	
+	AIOException(int _errorCode, const char * _message) throw ()
+	{
+		message = std::string(_message);
+		errorCode = _errorCode;
+	}
+	
+	virtual ~AIOException() throw()
+	{
+		
+	}
+	
+	int inline getErrorCode()
+	{
+		return errorCode;
+	}
+	
+    const char* what() const throw()
+    {
+    	return message.data();
+    }
+	
+};
+
+#endif /*AIOEXCEPTION_H_*/

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/src/main/c/AsyncFile.cpp
----------------------------------------------------------------------
diff --git a/activemq6-native/src/main/c/AsyncFile.cpp b/activemq6-native/src/main/c/AsyncFile.cpp
new file mode 100644
index 0000000..20316a6
--- /dev/null
+++ b/activemq6-native/src/main/c/AsyncFile.cpp
@@ -0,0 +1,344 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE
+#endif
+
+
+#include <stdlib.h>
+#include <list>
+#include <iostream>
+#include <sstream>
+#include <memory.h>
+#include <errno.h>
+#include <libaio.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <sys/stat.h>
+#include "AsyncFile.h"
+#include "AIOController.h"
+#include "AIOException.h"
+#include "pthread.h"
+#include "LockClass.h"
+#include "CallbackAdapter.h"
+#include "LockClass.h"
+
+//#define DEBUG
+
+#define WAIT_FOR_SPOT 10000
+#define TRIES_BEFORE_WARN 0
+#define TRIES_BEFORE_ERROR 500
+
+
+std::string io_error(int rc)
+{
+	std::stringstream buffer;
+
+	if (rc == -ENOSYS)
+		buffer << "AIO not in this kernel";
+	else
+		buffer << "Error:= " << strerror((int)-rc);
+
+	return buffer.str();
+}
+
+
+AsyncFile::AsyncFile(std::string & _fileName, AIOController * _controller, int _maxIO) : aioContext(0), events(0), fileHandle(0), controller(_controller), pollerRunning(0)
+{
+	::pthread_mutex_init(&fileMutex,0);
+	::pthread_mutex_init(&pollerMutex,0);
+
+	maxIO = _maxIO;
+	fileName = _fileName;
+	if (io_queue_init(maxIO, &aioContext))
+	{
+		throw AIOException(NATIVE_ERROR_CANT_INITIALIZE_AIO, "Can't initialize aio, out of AIO Handlers");
+	}
+
+	fileHandle = ::open(fileName.data(),  O_RDWR | O_CREAT | O_DIRECT, 0666);
+	if (fileHandle < 0)
+	{
+		io_queue_release(aioContext);
+		throw AIOException(NATIVE_ERROR_CANT_OPEN_CLOSE_FILE, "Can't open file");
+	}
+
+#ifdef DEBUG
+	fprintf (stderr,"File Handle %d", fileHandle);
+#endif
+
+	events = (struct io_event *)malloc (maxIO * sizeof (struct io_event));
+
+	if (events == 0)
+	{
+		throw AIOException (NATIVE_ERROR_CANT_ALLOCATE_QUEUE, "Can't allocate ioEvents");
+	}
+
+}
+
+AsyncFile::~AsyncFile()
+{
+	if (io_queue_release(aioContext))
+	{
+		throw AIOException(NATIVE_ERROR_CANT_RELEASE_AIO,"Can't release aio");
+	}
+	if (::close(fileHandle))
+	{
+		throw AIOException(NATIVE_ERROR_CANT_OPEN_CLOSE_FILE,"Can't close file");
+	}
+	free(events);
+	::pthread_mutex_destroy(&fileMutex);
+	::pthread_mutex_destroy(&pollerMutex);
+}
+
+int isException (THREAD_CONTEXT threadContext)
+{
+	return JNI_ENV(threadContext)->ExceptionOccurred() != 0;
+}
+
+void AsyncFile::pollEvents(THREAD_CONTEXT threadContext)
+{
+
+	LockClass lock(&pollerMutex);
+	pollerRunning=1;
+
+
+	while (pollerRunning)
+	{
+		if (isException(threadContext))
+		{
+			return;
+		}
+		int result = io_getevents(this->aioContext, 1, maxIO, events, 0);
+
+
+#ifdef DEBUG
+		fprintf (stderr, "poll, pollerRunning=%d\n", pollerRunning); fflush(stderr);
+#endif
+
+		if (result > 0)
+		{
+
+#ifdef DEBUG
+			fprintf (stdout, "Received %d events\n", result);
+			fflush(stdout);
+#endif
+		}
+
+		for (int i=0; i<result; i++)
+		{
+
+			struct iocb * iocbp = events[i].obj;
+
+			if (iocbp->data == (void *) -1)
+			{
+				pollerRunning = 0;
+#ifdef DEBUG
+				controller->log(threadContext, 2, "Received poller request to stop");
+#endif
+			}
+			else
+			{
+				CallbackAdapter * adapter = (CallbackAdapter *) iocbp->data;
+
+				long result = events[i].res;
+				if (result < 0)
+				{
+					std::string strerror = io_error((int)result);
+					adapter->onError(threadContext, result, strerror);
+				}
+				else
+				{
+					adapter->done(threadContext);
+				}
+			}
+
+			delete iocbp;
+		}
+	}
+#ifdef DEBUG
+	controller->log(threadContext, 2, "Poller finished execution");
+#endif
+}
+
+
+void AsyncFile::preAllocate(THREAD_CONTEXT , off_t position, int blocks, size_t size, int fillChar)
+{
+
+	if (size % ALIGNMENT != 0)
+	{
+		throw AIOException (NATIVE_ERROR_PREALLOCATE_FILE, "You can only pre allocate files in multiples of 512");
+	}
+
+	void * preAllocBuffer = 0;
+	if (posix_memalign(&preAllocBuffer, 512, size))
+	{
+		throw AIOException(NATIVE_ERROR_ALLOCATE_MEMORY, "Error on posix_memalign");
+	}
+
+	memset(preAllocBuffer, fillChar, size);
+
+
+	if (::lseek (fileHandle, position, SEEK_SET) < 0) throw AIOException (11, "Error positioning the file");
+
+	for (int i=0; i<blocks; i++)
+	{
+		if (::write(fileHandle, preAllocBuffer, size)<0)
+		{
+			throw AIOException (NATIVE_ERROR_PREALLOCATE_FILE, "Error pre allocating the file");
+		}
+	}
+
+	if (::lseek (fileHandle, position, SEEK_SET) < 0) throw AIOException (NATIVE_ERROR_IO, "Error positioning the file");
+
+	free (preAllocBuffer);
+}
+
+
+/** Write directly to the file without using libaio queue */
+void AsyncFile::writeInternal(THREAD_CONTEXT, long position, size_t size, void *& buffer)
+{
+	if (::lseek (fileHandle, position, SEEK_SET) < 0) throw AIOException (11, "Error positioning the file");
+
+	if (::write(fileHandle, buffer, size)<0)
+	{
+		throw AIOException (NATIVE_ERROR_IO, "Error writing file");
+	}
+	
+	if (::fsync(fileHandle) < 0)
+	{
+		throw AIOException (NATIVE_ERROR_IO, "Error on synchronizing file");
+	}
+	
+
+}
+
+
+void AsyncFile::write(THREAD_CONTEXT threadContext, long position, size_t size, void *& buffer, CallbackAdapter *& adapter)
+{
+
+	struct iocb * iocb = new struct iocb();
+	::io_prep_pwrite(iocb, fileHandle, buffer, size, position);
+	iocb->data = (void *) adapter;
+
+	int tries = 0;
+	int result = 0;
+
+	while ((result = ::io_submit(aioContext, 1, &iocb)) == (-EAGAIN))
+	{
+#ifdef DEBUG
+		fprintf (stderr, "Retrying block as iocb was full (retry=%d)\n", tries);
+#endif
+		tries ++;
+		if (tries > TRIES_BEFORE_WARN)
+		{
+#ifdef DEBUG
+		    fprintf (stderr, "Warning level on retries, informing logger (retry=%d)\n", tries);
+#endif
+			controller->log(threadContext, 1, "You should consider expanding AIOLimit if this message appears too many times");
+		}
+
+		if (tries > TRIES_BEFORE_ERROR)
+		{
+#ifdef DEBUG
+		    fprintf (stderr, "Error level on retries, throwing exception (retry=%d)\n", tries);
+#endif
+			throw AIOException(NATIVE_ERROR_AIO_FULL, "Too many retries (500) waiting for a valid iocb block, please increase MAX_IO limit");
+		}
+		::usleep(WAIT_FOR_SPOT);
+	}
+
+	if (result<0)
+	{
+		std::stringstream str;
+		str<< "Problem on submit block, errorCode=" << result;
+		throw AIOException (NATIVE_ERROR_IO, str.str());
+	}
+}
+
+void AsyncFile::read(THREAD_CONTEXT threadContext, long position, size_t size, void *& buffer, CallbackAdapter *& adapter)
+{
+
+	struct iocb * iocb = new struct iocb();
+	::io_prep_pread(iocb, fileHandle, buffer, size, position);
+	iocb->data = (void *) adapter;
+
+	int tries = 0;
+	int result = 0;
+
+	while ((result = ::io_submit(aioContext, 1, &iocb)) == (-EAGAIN))
+	{
+#ifdef DEBUG
+		fprintf (stderr, "Retrying block as iocb was full (retry=%d)\n", tries);
+#endif
+		tries ++;
+		if (tries > TRIES_BEFORE_WARN)
+		{
+#ifdef DEBUG
+		    fprintf (stderr, "Warning level on retries, informing logger (retry=%d)\n", tries);
+#endif
+			controller->log(threadContext, 1, "You should consider expanding AIOLimit if this message appears too many times");
+		}
+
+		if (tries > TRIES_BEFORE_ERROR)
+		{
+#ifdef DEBUG
+		    fprintf (stderr, "Error level on retries, throwing exception (retry=%d)\n", tries);
+#endif
+			throw AIOException(NATIVE_ERROR_AIO_FULL, "Too many retries (500) waiting for a valid iocb block, please increase MAX_IO limit");
+		}
+		::usleep(WAIT_FOR_SPOT);
+	}
+
+	if (result<0)
+	{
+		std::stringstream str;
+		str<< "Problem on submit block, errorCode=" << result;
+		throw AIOException (NATIVE_ERROR_IO, str.str());
+	}
+}
+
+long AsyncFile::getSize()
+{
+	struct stat statBuffer;
+
+	if (fstat(fileHandle, &statBuffer) < 0)
+	{
+		return -1l;
+	}
+	return statBuffer.st_size;
+}
+
+
+void AsyncFile::stopPoller(THREAD_CONTEXT threadContext)
+{
+	pollerRunning = 0;
+
+
+	struct iocb * iocb = new struct iocb();
+	::io_prep_pwrite(iocb, fileHandle, 0, 0, 0);
+	iocb->data = (void *) -1;
+
+	int result = 0;
+
+	while ((result = ::io_submit(aioContext, 1, &iocb)) == (-EAGAIN))
+	{
+		fprintf(stderr, "Couldn't send request to stop poller, trying again");
+		controller->log(threadContext, 1, "Couldn't send request to stop poller, trying again");
+		::usleep(WAIT_FOR_SPOT);
+	}
+
+	// Waiting the Poller to finish (by giving up the lock)
+	LockClass lock(&pollerMutex);
+}
+

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/src/main/c/AsyncFile.h
----------------------------------------------------------------------
diff --git a/activemq6-native/src/main/c/AsyncFile.h b/activemq6-native/src/main/c/AsyncFile.h
new file mode 100644
index 0000000..e74b78e
--- /dev/null
+++ b/activemq6-native/src/main/c/AsyncFile.h
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+#ifndef FILEOUTPUT_H_
+#define FILEOUTPUT_H_
+
+#include <string>
+#include <libaio.h>
+#include <stdlib.h>
+#include <pthread.h>
+#include "JAIODatatypes.h"
+#include "AIOException.h"
+
+class AIOController;
+
+class CallbackAdapter;
+
+/** Author: Clebert Suconic at Redhat dot com*/
+class AsyncFile
+{
+private:
+	io_context_t aioContext;
+	struct io_event *events; 
+	int fileHandle;
+	std::string fileName;
+	
+	pthread_mutex_t fileMutex;
+	pthread_mutex_t pollerMutex;
+	
+	AIOController * controller;
+	
+	bool pollerRunning;
+	
+	int maxIO;
+	
+public:
+	AsyncFile(std::string & _fileName, AIOController * controller, int maxIO);
+	virtual ~AsyncFile();
+	
+	void write(THREAD_CONTEXT threadContext, long position, size_t size, void *& buffer, CallbackAdapter *& adapter);
+	
+	/** Write directly to the file without using libaio queue */
+	void writeInternal(THREAD_CONTEXT threadContext, long position, size_t size, void *& buffer);
+	
+	void read(THREAD_CONTEXT threadContext, long position, size_t size, void *& buffer, CallbackAdapter *& adapter);
+	
+	int getHandle()
+	{
+		return fileHandle;
+	}
+
+	long getSize();
+
+	inline void * newBuffer(int size)
+	{
+		void * buffer = 0;
+		if (::posix_memalign(&buffer, 512, size))
+		{
+			throw AIOException(NATIVE_ERROR_ALLOCATE_MEMORY, "Error on posix_memalign");
+		}
+		return buffer;
+		
+	}
+
+	inline void destroyBuffer(void * buffer)
+	{
+		::free(buffer);
+	}
+
+	
+	// Finishes the polling thread (if any) and return
+	void stopPoller(THREAD_CONTEXT threadContext);
+	void preAllocate(THREAD_CONTEXT threadContext, off_t position, int blocks, size_t size, int fillChar);
+	
+	void pollEvents(THREAD_CONTEXT threadContext);
+	
+};
+
+#endif /*FILEOUTPUT_H_*/

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/src/main/c/CallbackAdapter.h
----------------------------------------------------------------------
diff --git a/activemq6-native/src/main/c/CallbackAdapter.h b/activemq6-native/src/main/c/CallbackAdapter.h
new file mode 100644
index 0000000..e9f7241
--- /dev/null
+++ b/activemq6-native/src/main/c/CallbackAdapter.h
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+#ifndef BUFFERADAPTER_H_
+#define BUFFERADAPTER_H_
+
+#include <iostream>
+
+#include "JAIODatatypes.h"
+
+class CallbackAdapter
+{
+private:
+
+public:
+	CallbackAdapter()
+	{
+		
+	}
+	virtual ~CallbackAdapter()
+	{
+		
+	}
+	
+	virtual void done(THREAD_CONTEXT ) = 0;
+	virtual void onError(THREAD_CONTEXT , long , std::string )=0;
+};
+#endif /*BUFFERADAPTER_H_*/

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/src/main/c/JAIODatatypes.h
----------------------------------------------------------------------
diff --git a/activemq6-native/src/main/c/JAIODatatypes.h b/activemq6-native/src/main/c/JAIODatatypes.h
new file mode 100644
index 0000000..a0840fa
--- /dev/null
+++ b/activemq6-native/src/main/c/JAIODatatypes.h
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+#ifndef JAIODATATYPES_H_
+#define JAIODATATYPES_H_
+
+#include <jni.h>
+
+#define THREAD_CONTEXT JNIEnv *&
+#define JNI_ENV(pointer) pointer 
+#define ALIGNMENT 512
+
+
+#endif /*JAIODATATYPES_H_*/

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/src/main/c/JNICallbackAdapter.cpp
----------------------------------------------------------------------
diff --git a/activemq6-native/src/main/c/JNICallbackAdapter.cpp b/activemq6-native/src/main/c/JNICallbackAdapter.cpp
new file mode 100644
index 0000000..2194325
--- /dev/null
+++ b/activemq6-native/src/main/c/JNICallbackAdapter.cpp
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+#include <jni.h>
+#include "JNICallbackAdapter.h"
+#include <iostream>
+#include "JavaUtilities.h"
+
+jobject nullObj = NULL;
+
+JNICallbackAdapter::JNICallbackAdapter(AIOController * _controller, jlong _sequence, jobject _callback, jobject _fileController, jobject _bufferReference, short _isRead) : CallbackAdapter()
+{
+	controller = _controller;
+
+	sequence = _sequence;
+
+	callback = _callback;
+
+	fileController = _fileController;
+
+	bufferReference = _bufferReference;
+
+	isRead = _isRead;
+
+}
+
+JNICallbackAdapter::~JNICallbackAdapter()
+{
+}
+
+void JNICallbackAdapter::done(THREAD_CONTEXT threadContext)
+{
+	JNI_ENV(threadContext)->CallVoidMethod(fileController, controller->done, callback,  sequence, isRead ? nullObj : bufferReference); 
+
+	release(threadContext);
+}
+
+void JNICallbackAdapter::onError(THREAD_CONTEXT threadContext, long errorCode, std::string error)
+{
+	controller->log(threadContext, 0, "Libaio event generated errors, callback object was informed about it");
+
+	jstring strError = JNI_ENV(threadContext)->NewStringUTF(error.data());
+
+	JNI_ENV(threadContext)->CallVoidMethod(fileController, controller->error, callback, sequence, isRead ? nullObj : bufferReference, (jint)errorCode, strError);
+
+	release(threadContext);
+}
+

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/src/main/c/JNICallbackAdapter.h
----------------------------------------------------------------------
diff --git a/activemq6-native/src/main/c/JNICallbackAdapter.h b/activemq6-native/src/main/c/JNICallbackAdapter.h
new file mode 100644
index 0000000..92404f8
--- /dev/null
+++ b/activemq6-native/src/main/c/JNICallbackAdapter.h
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+#ifndef JNIBUFFERADAPTER_H_
+#define JNIBUFFERADAPTER_H_
+
+#include <iostream>
+
+#include "CallbackAdapter.h"
+#include "AIOController.h"
+#include "JAIODatatypes.h"
+
+
+class JNICallbackAdapter : public CallbackAdapter
+{
+private:
+
+	AIOController * controller;
+	
+	jobject callback;
+	
+	jobject fileController;
+	
+	jobject bufferReference;
+	
+	jlong sequence;
+	
+	// Is this a read operation
+	short isRead;
+
+	void release(THREAD_CONTEXT threadContext)
+	{
+		JNI_ENV(threadContext)->DeleteGlobalRef(callback);
+		JNI_ENV(threadContext)->DeleteGlobalRef(fileController);
+		JNI_ENV(threadContext)->DeleteGlobalRef(bufferReference);
+		delete this;
+		return;
+	}
+	
+	
+public:
+	// _ob must be a global Reference (use createGloblReferente before calling the constructor)
+	JNICallbackAdapter(AIOController * _controller, jlong sequence, jobject _callback, jobject _fileController, jobject _bufferReference, short _isRead);
+	virtual ~JNICallbackAdapter();
+
+	void done(THREAD_CONTEXT threadContext);
+
+	void onError(THREAD_CONTEXT , long , std::string );
+
+	
+};
+#endif /*JNIBUFFERADAPTER_H_*/

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/src/main/c/JNI_AsynchronousFileImpl.cpp
----------------------------------------------------------------------
diff --git a/activemq6-native/src/main/c/JNI_AsynchronousFileImpl.cpp b/activemq6-native/src/main/c/JNI_AsynchronousFileImpl.cpp
new file mode 100644
index 0000000..018744b
--- /dev/null
+++ b/activemq6-native/src/main/c/JNI_AsynchronousFileImpl.cpp
@@ -0,0 +1,373 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+#include <jni.h>
+#include <stdlib.h>
+#include <iostream>
+#include <stdio.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <string>
+#include <time.h>
+#include <sys/file.h>
+
+#include "org_hornetq_core_libaio_Native.h"
+
+
+#include "JavaUtilities.h"
+#include "AIOController.h"
+#include "JNICallbackAdapter.h"
+#include "AIOException.h"
+#include "Version.h"
+
+
+// This value is set here globally, to avoid passing stuff on stack between java and the native layer on every sleep call
+struct timespec nanoTime;
+
+inline AIOController * getController(JNIEnv *env, jobject & controllerAddress)
+{
+     return (AIOController *) env->GetDirectBufferAddress(controllerAddress);
+} 
+
+/* Inaccessible static: log */
+/* Inaccessible static: totalMaxIO */
+/* Inaccessible static: loaded */
+/* Inaccessible static: EXPECTED_NATIVE_VERSION */
+/*
+ * Class:     org_hornetq_core_asyncio_impl_AsynchronousFileImpl
+ * Method:    openFile
+ * Signature: (Ljava/lang/String;)I
+ */
+JNIEXPORT jint JNICALL Java_org_hornetq_core_libaio_Native_openFile
+  (JNIEnv * env , jclass , jstring jstrFileName)
+{
+	std::string fileName = convertJavaString(env, jstrFileName);
+
+    return open(fileName.data(), O_RDWR | O_CREAT, 0666);
+}
+
+/*
+ * Class:     org_hornetq_core_asyncio_impl_AsynchronousFileImpl
+ * Method:    closeFile
+ * Signature: (I)V
+ */
+JNIEXPORT void JNICALL Java_org_hornetq_core_libaio_Native_closeFile
+  (JNIEnv * , jclass , jint handle)
+{
+   close(handle);
+}
+
+/*
+ * Class:     org_hornetq_core_asyncio_impl_AsynchronousFileImpl
+ * Method:    flock
+ * Signature: (I)Z
+ */
+JNIEXPORT jboolean JNICALL Java_org_hornetq_core_libaio_Native_flock
+  (JNIEnv * , jclass , jint handle)
+{
+    return flock(handle, LOCK_EX | LOCK_NB) == 0;
+}
+
+
+
+/*
+ * Class:     org_jboss_jaio_libaioimpl_LibAIOController
+ * Method:    init
+ * Signature: (Ljava/lang/String;Ljava/lang/Class;)J
+ */
+JNIEXPORT jobject JNICALL Java_org_hornetq_core_libaio_Native_init
+  (JNIEnv * env, jclass, jclass controllerClazz, jstring jstrFileName, jint maxIO, jobject logger)
+{
+	AIOController * controller = 0;
+	try
+	{
+		std::string fileName = convertJavaString(env, jstrFileName);
+
+		controller = new AIOController(fileName, (int) maxIO);
+		controller->done = env->GetMethodID(controllerClazz,"callbackDone","(Lorg/hornetq/core/asyncio/AIOCallback;JLjava/nio/ByteBuffer;)V");
+		if (!controller->done)
+		{
+		   throwException (env, -1, "can't get callbackDone method");
+		   return 0;
+		}
+
+		controller->error = env->GetMethodID(controllerClazz, "callbackError", "(Lorg/hornetq/core/asyncio/AIOCallback;JLjava/nio/ByteBuffer;ILjava/lang/String;)V");
+		if (!controller->done)
+		{
+		   throwException (env, -1, "can't get callbackError method");
+		   return 0;
+		}
+
+        jclass loggerClass = env->GetObjectClass(logger);
+
+        if (!(controller->loggerDebug = env->GetMethodID(loggerClass, "debug", "(Ljava/lang/Object;)V"))) return 0;
+        if (!(controller->loggerWarn = env->GetMethodID(loggerClass, "warn", "(Ljava/lang/Object;)V"))) return 0;
+        if (!(controller->loggerInfo = env->GetMethodID(loggerClass, "info", "(Ljava/lang/Object;)V"))) return 0;
+        if (!(controller->loggerError = env->GetMethodID(loggerClass, "error", "(Ljava/lang/Object;)V"))) return 0;
+
+        controller->logger = env->NewGlobalRef(logger);
+
+		return env->NewDirectByteBuffer(controller, 0);
+	}
+	catch (AIOException& e){
+		if (controller != 0)
+		{
+			delete controller;
+		}
+		throwException(env, e.getErrorCode(), e.what());
+		return 0;
+	}
+}
+
+/**
+* objThis here is passed as a parameter at the java layer. It used to be a JNI this and now it's a java static method
+  where the intended reference is now passed as an argument
+*/
+JNIEXPORT void JNICALL Java_org_hornetq_core_libaio_Native_read
+  (JNIEnv *env, jclass, jobject objThis, jobject controllerAddress, jlong position, jlong size, jobject jbuffer, jobject callback)
+{
+	try
+	{
+		AIOController * controller = getController(env, controllerAddress);
+		void * buffer = env->GetDirectBufferAddress(jbuffer);
+
+		if (buffer == 0)
+		{
+			throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Buffer used, libaio requires NativeBuffer instead of Java ByteBuffer");
+			return;
+		}
+
+		if (((long)buffer) % 512)
+		{
+			throwException(env, NATIVE_ERROR_NOT_ALIGNED, "Buffer not aligned for use with DMA");
+			return;
+		}
+
+		CallbackAdapter * adapter = new JNICallbackAdapter(controller, -1, env->NewGlobalRef(callback), env->NewGlobalRef(objThis), env->NewGlobalRef(jbuffer), true);
+
+		controller->fileOutput.read(env, position, (size_t)size, buffer, adapter);
+	}
+	catch (AIOException& e)
+	{
+		throwException(env, e.getErrorCode(), e.what());
+	}
+}
+
+
+// Fast memset on buffer
+JNIEXPORT void JNICALL Java_org_hornetq_core_libaio_Native_resetBuffer
+  (JNIEnv *env, jclass, jobject jbuffer, jint size)
+{
+	void * buffer = env->GetDirectBufferAddress(jbuffer);
+
+	if (buffer == 0)
+	{
+		throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Buffer used, libaio requires NativeBuffer instead of Java ByteBuffer");
+		return;
+	}
+
+	memset(buffer, 0, (size_t)size);
+
+}
+
+JNIEXPORT void JNICALL Java_org_hornetq_core_libaio_Native_destroyBuffer
+  (JNIEnv * env, jclass, jobject jbuffer)
+{
+    if (jbuffer == 0)
+    {
+		throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Null Buffer");
+		return;
+    }
+	void *  buffer = env->GetDirectBufferAddress(jbuffer);
+	free(buffer);
+}
+
+JNIEXPORT jobject JNICALL Java_org_hornetq_core_libaio_Native_newNativeBuffer
+  (JNIEnv * env, jclass, jlong size)
+{
+	try
+	{
+
+		if (size % ALIGNMENT)
+		{
+			throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Buffer size needs to be aligned to 512");
+			return 0;
+		}
+
+
+		// This will allocate a buffer, aligned by 512.
+		// Buffers created here need to be manually destroyed by destroyBuffer, or this would leak on the process heap away of Java's GC managed memory
+		void * buffer = 0;
+		if (::posix_memalign(&buffer, 512, size))
+		{
+			throwException(env, NATIVE_ERROR_INTERNAL, "Error on posix_memalign");
+			return 0;
+		}
+
+		memset(buffer, 0, (size_t)size);
+
+		jobject jbuffer = env->NewDirectByteBuffer(buffer, size);
+		return jbuffer;
+	}
+	catch (AIOException& e)
+	{
+		throwException(env, e.getErrorCode(), e.what());
+		return 0;
+	}
+}
+
+/**
+* objThis here is passed as a parameter at the java layer. It used to be a JNI this and now it's a java static method
+  where the intended reference is now passed as an argument
+*/
+JNIEXPORT void JNICALL Java_org_hornetq_core_libaio_Native_write
+  (JNIEnv *env, jclass, jobject objThis, jobject controllerAddress, jlong sequence, jlong position, jlong size, jobject jbuffer, jobject callback)
+{
+	try
+	{
+		AIOController * controller = getController(env, controllerAddress);
+		void * buffer = env->GetDirectBufferAddress(jbuffer);
+
+		if (buffer == 0)
+		{
+			throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Buffer used, libaio requires NativeBuffer instead of Java ByteBuffer");
+			return;
+		}
+
+
+		CallbackAdapter * adapter = new JNICallbackAdapter(controller, sequence, env->NewGlobalRef(callback), env->NewGlobalRef(objThis), env->NewGlobalRef(jbuffer), false);
+
+		controller->fileOutput.write(env, position, (size_t)size, buffer, adapter);
+	}
+	catch (AIOException& e)
+	{
+		throwException(env, e.getErrorCode(), e.what());
+	}
+}
+
+JNIEXPORT void JNICALL Java_org_hornetq_core_libaio_Native_writeInternal
+  (JNIEnv * env, jclass, jobject controllerAddress, jlong positionToWrite, jlong size, jobject jbuffer)
+{
+	try
+	{
+		AIOController * controller = getController(env, controllerAddress);
+		void * buffer = env->GetDirectBufferAddress(jbuffer);
+
+		if (buffer == 0)
+		{
+			throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Buffer used, libaio requires NativeBuffer instead of Java ByteBuffer");
+			return;
+		}
+
+		controller->fileOutput.writeInternal(env, positionToWrite, (size_t)size, buffer);
+	}
+	catch (AIOException& e)
+	{
+		throwException(env, e.getErrorCode(), e.what());
+	}
+}
+
+
+JNIEXPORT void Java_org_hornetq_core_libaio_Native_internalPollEvents
+  (JNIEnv *env, jclass, jobject controllerAddress)
+{
+	try
+	{
+		AIOController * controller = getController(env, controllerAddress);
+		controller->fileOutput.pollEvents(env);
+	}
+	catch (AIOException& e)
+	{
+		throwException(env, e.getErrorCode(), e.what());
+	}
+}
+
+JNIEXPORT void JNICALL Java_org_hornetq_core_libaio_Native_stopPoller
+  (JNIEnv *env, jclass, jobject controllerAddress)
+{
+	try
+	{
+		AIOController * controller = getController(env, controllerAddress);
+		controller->fileOutput.stopPoller(env);
+	}
+	catch (AIOException& e)
+	{
+		throwException(env, e.getErrorCode(), e.what());
+	}
+}
+
+JNIEXPORT void JNICALL Java_org_hornetq_core_libaio_Native_closeInternal
+  (JNIEnv *env, jclass, jobject controllerAddress)
+{
+	try
+	{
+		AIOController * controller = getController(env, controllerAddress);
+		controller->destroy(env);
+		delete controller;
+	}
+	catch (AIOException& e)
+	{
+		throwException(env, e.getErrorCode(), e.what());
+	}
+}
+
+
+JNIEXPORT void JNICALL Java_org_hornetq_core_libaio_Native_fill
+  (JNIEnv * env, jclass, jobject controllerAddress, jlong position, jint blocks, jlong size, jbyte fillChar)
+{
+	try
+	{
+		AIOController * controller = getController(env, controllerAddress);
+
+		controller->fileOutput.preAllocate(env, position, blocks, size, fillChar);
+
+	}
+	catch (AIOException& e)
+	{
+		throwException(env, e.getErrorCode(), e.what());
+	}
+}
+
+
+
+/** It does nothing... just return true to make sure it has all the binary dependencies */
+JNIEXPORT jint JNICALL Java_org_hornetq_core_libaio_Native_getNativeVersion
+  (JNIEnv *, jclass)
+
+{
+     return _VERSION_NATIVE_AIO;
+}
+
+
+JNIEXPORT jlong JNICALL Java_org_hornetq_core_libaio_Native_size0
+  (JNIEnv * env, jclass, jobject controllerAddress)
+{
+	try
+	{
+		AIOController * controller = getController(env, controllerAddress);
+
+		long size = controller->fileOutput.getSize();
+		if (size < 0)
+		{
+			throwException(env, NATIVE_ERROR_INTERNAL, "InternalError on Native Layer: method size failed");
+			return -1l;
+		}
+		return size;
+	}
+	catch (AIOException& e)
+	{
+		throwException(env, e.getErrorCode(), e.what());
+		return -1l;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/src/main/c/JavaUtilities.cpp
----------------------------------------------------------------------
diff --git a/activemq6-native/src/main/c/JavaUtilities.cpp b/activemq6-native/src/main/c/JavaUtilities.cpp
new file mode 100644
index 0000000..b5ddc57
--- /dev/null
+++ b/activemq6-native/src/main/c/JavaUtilities.cpp
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+#include <stdio.h>
+#include <iostream>
+#include <string>
+#include "JavaUtilities.h"
+
+
+void throwRuntimeException(JNIEnv * env, const char * message)
+{
+  jclass exceptionClass = env->FindClass("java/lang/RuntimeException");
+  env->ThrowNew(exceptionClass,message);
+  
+}
+
+void throwException(JNIEnv * env, const int code, const char * message)
+{
+  jclass exceptionClass = env->FindClass("org/hornetq/api/core/HornetQException");
+  if (exceptionClass==NULL) 
+  {
+     std::cerr << "Couldn't throw exception message:= " << message << "\n";
+     throwRuntimeException (env, "Can't find Exception class");
+     return;
+  }
+
+  jmethodID constructor = env->GetMethodID(exceptionClass, "<init>", "(ILjava/lang/String;)V");
+  if (constructor == NULL)
+  {
+       std::cerr << "Couldn't find the constructor ***";
+       throwRuntimeException (env, "Can't find Constructor for Exception");
+       return;
+  }
+
+  jstring strError = env->NewStringUTF(message);
+  jthrowable ex = (jthrowable)env->NewObject(exceptionClass, constructor, code, strError);
+  env->Throw(ex);
+  
+}
+
+std::string convertJavaString(JNIEnv * env, jstring& jstr)
+{
+	const char * valueStr = env->GetStringUTFChars(jstr, NULL);
+	std::string data(valueStr);
+	env->ReleaseStringUTFChars(jstr, valueStr);
+	return data;
+}
+

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/src/main/c/JavaUtilities.h
----------------------------------------------------------------------
diff --git a/activemq6-native/src/main/c/JavaUtilities.h b/activemq6-native/src/main/c/JavaUtilities.h
new file mode 100644
index 0000000..ff5a828
--- /dev/null
+++ b/activemq6-native/src/main/c/JavaUtilities.h
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+#ifndef JAVAUTILITIES_H_
+#define JAVAUTILITIES_H_
+#include <string>
+#include <jni.h>
+
+void throwException(JNIEnv * env, const int code, const char * message);
+std::string convertJavaString(JNIEnv * env, jstring& jstr);
+
+#endif /*JAVAUTILITIES_H_*/

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/src/main/c/LockClass.h
----------------------------------------------------------------------
diff --git a/activemq6-native/src/main/c/LockClass.h b/activemq6-native/src/main/c/LockClass.h
new file mode 100644
index 0000000..de7bfb6
--- /dev/null
+++ b/activemq6-native/src/main/c/LockClass.h
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+#ifndef LOCKCLASS_H_
+#define LOCKCLASS_H_
+
+#include <pthread.h>
+
+class LockClass
+{
+protected:
+    pthread_mutex_t* _m;
+public:
+    inline LockClass(pthread_mutex_t* m) : _m(m)
+    {
+        ::pthread_mutex_lock(_m);
+    }
+    inline ~LockClass()
+    {
+        ::pthread_mutex_unlock(_m);
+    }
+};
+
+
+#endif /*LOCKCLASS_H_*/

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/src/main/c/Version.h
----------------------------------------------------------------------
diff --git a/activemq6-native/src/main/c/Version.h b/activemq6-native/src/main/c/Version.h
new file mode 100644
index 0000000..7204802
--- /dev/null
+++ b/activemq6-native/src/main/c/Version.h
@@ -0,0 +1,8 @@
+
+#ifndef _VERSION_NATIVE_AIO
+
+// This definition needs to match org.hornetq.core.asyncio.impl.AsynchronousFileImpl.EXPECTED_NATIVE_VERSION
+// Or else the native module won't be loaded because of version mismatches
+#define _VERSION_NATIVE_AIO 52
+#endif
+

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-native/src/main/java/org/apache/activemq6/core/libaio/Native.java
----------------------------------------------------------------------
diff --git a/activemq6-native/src/main/java/org/apache/activemq6/core/libaio/Native.java b/activemq6-native/src/main/java/org/apache/activemq6/core/libaio/Native.java
new file mode 100644
index 0000000..c7705d9
--- /dev/null
+++ b/activemq6-native/src/main/java/org/apache/activemq6/core/libaio/Native.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.libaio;
+
+import java.nio.ByteBuffer;
+
+import org.apache.activemq6.api.core.HornetQException;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class Native
+{
+   // Functions used for locking files .....
+   public static native int openFile(String fileName);
+
+   public static native void closeFile(int handle);
+
+   public static native boolean flock(int handle);
+   // Functions used for locking files ^^^^^^^^
+
+   public static native void resetBuffer(ByteBuffer directByteBuffer, int size);
+
+   public static native void destroyBuffer(ByteBuffer buffer);
+
+   public static native ByteBuffer newNativeBuffer(long size);
+
+   public static native void newInit(Class someClass);
+
+   public static native ByteBuffer init(Class controllerClass, String fileName, int maxIO, Object logger) throws HornetQException;
+
+   public static native long size0(ByteBuffer handle);
+
+   public static native void write(Object thisObject, ByteBuffer handle,
+                             long sequence,
+                             long position,
+                             long size,
+                             ByteBuffer buffer,
+                             Object aioPackageCallback) throws HornetQException;
+
+   /** a direct write to the file without the use of libaio's submit. */
+   public static native void writeInternal(ByteBuffer handle, long positionToWrite, long size, ByteBuffer bytes) throws HornetQException;
+
+   /**
+    *This is using org.apache.activemq6.core.asyncio.AIOCallback
+     */
+   public static native void read(Object thisObject, ByteBuffer handle, long position, long size, ByteBuffer buffer, Object aioPackageCallback) throws HornetQException;
+
+   public static native void fill(ByteBuffer handle, long position, int blocks, long size, byte fillChar) throws HornetQException;
+
+   public static native void closeInternal(ByteBuffer handler);
+
+   public static native void stopPoller(ByteBuffer handler);
+
+   /** A native method that does nothing, and just validate if the ELF dependencies are loaded and on the correct platform as this binary format */
+   public static native int getNativeVersion();
+
+   /** Poll asynchronous events from internal queues */
+   public static native void internalPollEvents(ByteBuffer handler);
+
+   // Inner classes ---------------------------------------------------------------------
+
+}


Mime
View raw message